#!/usr/local/bin/python
# Copyright © 2019 The vt-py authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Processes items from VT API feeds."""
from datetime import datetime
from datetime import timedelta
import enum
import io
import json
import asyncio
import bz2
from .error import APIError
from .object import Object
from .utils import make_sync
__all__ = [
'Feed',
'FeedType']
[docs]class FeedType(enum.Enum):
"""Feed types."""
FILES = 'files'
URLS = 'urls'
FILE_BEHAVIOURS = 'file-behaviours'
[docs]class Feed:
"""Feed represents a stream of objects received from VirusTotal in real-time.
For more information about VirusTotal Feeds see:
https://docs.virustotal.com/reference/feeds
In the example below the loop iterates forever, retrieving file objects as
they are processed by VirusTotal. For a more elaborate example see the file
examples/file_feed.py in this repository.
>>> with vt.Client(<apikey>) as client:
>>> for file_obj in client.feed(vt.FeedType.FILES):
>>> print(file_obj.id)
Instances of this class are not created directly, you should use the
:func:`vt.Client.feed` method instead.
"""
def __init__(self, client, feed_type, cursor=None):
"""Initializes a Feed object.
This function is not intended to be called directly. Client.feed() is
the preferred way for creating a feed.
"""
self._client = client
self._type = feed_type
self._batch = None
self._count = 0
# This class tolerates a given number of consecutive missing batches in
# the feed. If self._missing_batches_tolerancy is set to 0, there's no
# tolerancy for missing batches and even a single missing batch will
# cause an error. However, missing batches can occur from time to time.
self._missing_batches_tolerancy = 1
if cursor:
batch_time, _, batch_skip = cursor.partition('-')
self._batch_time = datetime.strptime(batch_time, '%Y%m%d%H%M')
self._batch_skip = int(batch_skip) if batch_skip else 0
else:
self._batch_time = datetime.utcnow() - timedelta(minutes=70)
self._batch_skip = 0
self._next_batch_time = self._batch_time
async def _get_batch_async(self, batch_time):
""""Retrieves a specific batch from the backend.
There's one batch per minute, each identified by the date in YYYYMMDDhhmm
format. The batch_time argument is a datetime object that is converted to
this format, the seconds in the datetime are ignored.
"""
while True:
response = await self._client.get_async(
f'/feeds/{self._type.value}/{batch_time.strftime("%Y%m%d%H%M")}')
error = await self._client.get_error_async(response)
if not error:
break
if error.code == 'NotAvailableYet':
await asyncio.sleep(60)
else:
raise error
return io.BytesIO(bz2.decompress(await response.content.read_async()))
async def _get_next_batch_async(self):
"""Retrieves the next batch from the feed.
This function tolerates a certain number of missing batches. If some batch
is missing the next one will be retrieved. If the number of missing
batches is greater than the tolerancy set, the function raises an error.
"""
missing_batches = 0
while True:
try:
self._batch_time = self._next_batch_time
self._next_batch_time += timedelta(seconds=60)
self._batch = await self._get_batch_async(self._batch_time)
self._batch_cursor = 0
break
except APIError as error:
# The only acceptable error here is NotFoundError, if such an error
# occurs we try to get the next batch.
if error.code != 'NotFoundError':
raise error
missing_batches += 1
if missing_batches > self._missing_batches_tolerancy:
raise error
def _skip(self, n):
for _ in range(n):
self._batch.readline()
self._batch_cursor += 1
def __iter__(self):
return self
def __aiter__(self):
return self
def __next__(self):
try:
return make_sync(self.__anext__())
except StopAsyncIteration as exc:
raise StopIteration() from exc
async def __anext__(self):
while True:
if not self._batch:
await self._get_next_batch_async()
self._skip(self._batch_skip)
self._batch_skip = 0
next_item = self._batch.readline()
if next_item:
self._count += 1
self._batch_cursor += 1
return Object.from_dict(json.loads(next_item.decode('utf-8')))
else:
self._batch = None
@property
def cursor(self):
"""Returns a cursor indicating the last item retrieved from the feed.
This cursor can be used for creating a new Feed object that continues where
a previous one left.
"""
return self._batch_time.strftime('%Y%m%d%H%M-') + str(self._batch_cursor)