Source code for vt.iterator

# Copyright © 2019 The vt-py authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from .object import Object


__all__ = ['Iterator']


[docs]class Iterator: """Iterator allows iterating over object collections. Some endpoints in the VirusTotal API represent a collection of objects, for example: `/files/{id}/comments <https://developers.virustotal.com/v3.0/reference#files-comments-get>`_ `/intelligence/search <https://developers.virustotal.com/v3.0/reference#intelligence-search>`_ These collections can be iterated using an instance of this class. Learn more about collections in the VirusTotal API in: https://developers.virustotal.com/v3.0/reference#collections The following example iterates over the most recent 200 comments, retrieving them in batches of 20: >>> client = vt.Client(<apikey>) >>> it = client.iterator('/comments', batch_size=20, limit=200) >>> for comment in it: >>> print(comment.text) >>> print(it.cursor) When the iteration is done, it print the iterator's cursor. The cursor can be used for creating another iterator that continues at the point where the previous iterator left. The Iterator class also exposes an async iterator: >>> # Define an async coroutine that iterates over the comments. >>> async def print_comments(): >>> async for comment in client.iterator('/comments', limit=200): >>> print(comment.id) >>> # Run the print_comments coroutine using asyncio >>> import asyncio >>> asyncio.get_event_loop().run_until_complete(print_comments) """ def __init__(self, client, path, params=None, cursor=None, limit=0, batch_size=0): """Initializes an iterator. This function is not intended to be called directly. Client.iterator() is the preferred way for creating an iteraror. """ self._client = client self._path = path self._params = params or {} self._batch_size = batch_size self._limit = limit self._items = [] self._count = 0 self._server_cursor = None self._batch_cursor = 0 if 'cursor' in self._params: raise ValueError('Do not pass "cursor" as a path param') if 'limit' in self._params: raise ValueError('Do not pass "limit" as a path param') if cursor: self._server_cursor, _, batch_cursor = cursor.rpartition('-') if not self._server_cursor: raise ValueError('invalid cursor') try: self._batch_cursor = int(batch_cursor) except ValueError: raise ValueError('invalid cursor') def _build_params(self): params = self._params.copy() if self._server_cursor: params['cursor'] = self._server_cursor if self._batch_size: params['limit'] = self._batch_size return params def _parse_response(self, json_resp, batch_cursor): if not isinstance(json_resp.get('data'), list): raise ValueError('{} is not a collection'.format(self._path)) meta = json_resp.get('meta', {}) items = json_resp['data'][batch_cursor:] return items, meta.get('cursor') async def _get_batch_async(self, batch_cursor=0): json_resp = await self._client.get_json_async( self._path, params=self._build_params()) return self._parse_response(json_resp, batch_cursor) def _get_batch(self, batch_cursor=0): json_resp = self._client.get_json( self._path, params=self._build_params()) return self._parse_response(json_resp, batch_cursor) def __iter__(self): self._items, self._server_cursor = self._get_batch() while (self._items or self._server_cursor) and self._count < self._limit: if len(self._items) == 0: self._items, self._server_cursor = self._get_batch() self._batch_cursor = 0 else: item = self._items.pop(0) self._count += 1 self._batch_cursor += 1 yield Object.from_dict(item) async def __aiter__(self): self._items, self._server_cursor = await self._get_batch_async() while (self._items or self._server_cursor) and self._count < self._limit: if len(self._items) == 0: self._items, self._server_cursor = await self._get_batch_async() self._batch_cursor = 0 else: item = self._items.pop(0) self._count += 1 self._batch_cursor += 1 yield Object.from_dict(item) @property def cursor(self): """Cursor indicating the last returned object. This cursor can be used for creating a new iterator that continues where the current one left. """ if not self._server_cursor: return None return self._server_cursor + '-' + str(self._batch_cursor)