Source code for vt.iterator

# Copyright © 2019 The vt-py authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines an iterator to loop through VT API collections."""

from .object import Object
from .utils import make_sync

__all__ = ['Iterator']


[docs]class Iterator: # pylint: disable=line-too-long """Iterator allows iterating over object collections. Some endpoints in the VirusTotal API represent a collection of objects, for example: `/files/{id}/comments <https://docs.virustotal.com/reference/files-comments-get>`_ `/intelligence/search <https://docs.virustotal.com/reference/intelligence-search>`_ These collections can be iterated using an instance of this class. Learn more about collections in the VirusTotal API in: https://docs.virustotal.com/reference/collections The following example iterates over the most recent 200 comments, retrieving them in batches of 20: >>> client = vt.Client(<apikey>) >>> it = client.iterator('/comments', batch_size=20, limit=200) >>> for comment in it: >>> print(comment.text) >>> print(it.cursor) When the iteration is done, it print the iterator's cursor. The cursor can be used for creating another iterator that continues at the point where the previous iterator left. The Iterator class also exposes an async iterator: >>> # Define an async coroutine that iterates over the comments. >>> async def print_comments(): >>> async for comment in client.iterator('/comments', limit=200): >>> print(comment.id) >>> # Run the print_comments coroutine using asyncio >>> import asyncio >>> asyncio.get_event_loop().run_until_complete(print_comments) """ # pylint: disable=line-too-long def __init__(self, client, path, params=None, cursor=None, limit=None, batch_size=0): """Initializes an iterator. This function is not intended to be called directly. Client.iterator() is the preferred way for creating an iteraror. """ self._client = client self._path = path self._params = params or {} self._batch_size = batch_size self._limit = limit self._items = [] self._count = 0 self._server_cursor = None self._batch_cursor = 0 self._meta = None if 'cursor' in self._params: raise ValueError('Do not pass "cursor" as a path param') if 'limit' in self._params: raise ValueError('Do not pass "limit" as a path param') if cursor: self._server_cursor, _, batch_cursor = cursor.rpartition('-') if not self._server_cursor: raise ValueError('invalid cursor') try: self._batch_cursor = int(batch_cursor) except ValueError as exc: raise ValueError('invalid cursor') from exc def _build_params(self): params = self._params.copy() if self._server_cursor: params['cursor'] = self._server_cursor if self._batch_size: params['limit'] = self._batch_size return params def _parse_response(self, json_resp, batch_cursor): if not isinstance(json_resp.get('data'), list): raise ValueError(f'{self._path} is not a collection') meta = json_resp.get('meta', {}) items = json_resp['data'][batch_cursor:] return items, meta async def _get_batch_async(self, batch_cursor=0): json_resp = await self._client.get_json_async( self._path, params=self._build_params()) return self._parse_response(json_resp, batch_cursor) def __iter__(self): return self def __aiter__(self): return self def __next__(self): try: return make_sync(self.__anext__()) except StopAsyncIteration as exc: raise StopIteration() from exc async def __anext__(self): if self._limit and self._count == self._limit: raise StopAsyncIteration() if not self._items and (self.cursor or self._count == 0): self._items, self._meta = await self._get_batch_async() self._server_cursor = self._meta.pop('cursor', None) self._batch_cursor = 0 if not self._items and not self._server_cursor: raise StopAsyncIteration() item = self._items.pop(0) self._count += 1 self._batch_cursor += 1 return Object.from_dict(item) @property def cursor(self): """Cursor indicating the last returned object. This cursor can be used for creating a new iterator that continues where the current one left. """ if not self._server_cursor or not self._count: return None return self._server_cursor + '-' + str(self._batch_cursor) @property async def meta_async(self): """Meta information. The cursor is not included, as it's exposed as a property. """ if self._meta is None: # Load the first batch of items in order to retrieve the meta info. self._items, self._meta = await self._get_batch_async() self._server_cursor = self._meta.pop('cursor', None) self._batch_cursor = 0 return self._meta @property def meta(self): return make_sync(self.meta_async)