Source code for aiothrottle.throttle

"""
Throttle: Manages rate limiting for general connections
ThrottledStreamReader: Throttles aiohttp downloads
"""

import asyncio
import aiohttp
import logging
import functools


LOGGER = logging.getLogger(__package__)


class Throttle:
[docs] """Throttle for IO operations As soon as an IO action is registered using :meth:`add_io`, :meth:`time_left` returns the seconds to wail until ``[byte count] / [time passed] = [rate limit]``. After that, :meth:`reset_io` has to be called to measure the new rate. :param int limit: the limit in bytes to read/write per second :raises: :class:`ValueError`: invalid rate given """ def __init__(self, limit, loop=None): self._limit = 0 self.limit = limit self._io = 0 if loop is None: loop = asyncio.get_event_loop() self._loop = loop self._reset_time = loop.time() @property def limit(self): """ :returns: the limit in bytes to read/write per second :rtype: int """ return self._limit @limit.setter def limit(self, value): """ :param value: the limit in bytes to read/write per second :raises: :class:`ValueError` invalid rate given """ if value <= 0: raise ValueError("rate_limit has to be greater than 0") self._limit = value def time_left(self):
[docs] """returns the number of seconds left until the rate limit is reached :returns: seconds left until the rate limit is reached :rtype: float """ remaining = self._io / self._limit LOGGER.debug("[throttle] time remaining: %.3f", remaining) return remaining def add_io(self, byte_count):
[docs] """registers a number of bytes read/written :param int byte_count: number of bytes to add to the current rate """ self._io += byte_count LOGGER.debug( "[throttle] added bytes: %d, now: %d", byte_count, self._io) def reset_io(self):
[docs] """resets the registered IO actions""" self._io = 0 self._reset_time = self._loop.time() LOGGER.debug("[throttle] reset IO") @asyncio.coroutine
def wait_remaining(self):
[docs] """waits until the rate limit is reached""" time_left = self.time_left() LOGGER.debug("[throttle] sleeping for %.3f seconds", time_left) yield from asyncio.sleep(time_left) def current_rate(self):
[docs] """returns the current rate, measured since :meth:`reset_io` In case the time since the last reset is too short, this returns ``-1``. :returns: the current rate in bytes per second :rtype: float """ if self._io <= 0: return 0 now = self._loop.time() duration = now - self._reset_time if duration <= 0: raise RuntimeError("unable to measure rate, duraction <= 0") rate = self._io / duration LOGGER.debug("[throttle] measured current rate: %.3f B/s", rate) return rate def within_limit(self):
[docs] """returns the current limitation state :returns: ``True`` if the current rate is equal or below the limit rate :rtype: bool """ current = self.current_rate() within_limit = current < self._limit LOGGER.debug( "[throttle] %s rate", "within" if within_limit else "not within") return within_limit class ThrottledStreamReader(aiohttp.StreamReader):
[docs] """Throttling, flow controlling :class:`aiohttp.streams.StreamReader` for :meth:`aiohttp.request` Usage: >>> import functools >>> import aiohttp >>> import aiothrottle >>> kbps = 200 >>> partial = functools.partial( >>> aiothrottle.ThrottledStreamReader, rate_limit=kbps * 1024) >>> aiohttp.client_reqrep.ClientResponse.flow_control_class = partial :param aiohttp.parsers.StreamParser stream: the base stream :param int rate_limit: the rate limit in bytes per second :param int buffer_limit: the internal buffer limit in bytes :param asyncio.BaseEventLoop loop: the asyncio event loop :param tuple args: arguments passed through to StreamReader :param dict kwargs: keyword arguments passed through to StreamReader """ def __init__( self, stream, rate_limit, buffer_limit=2**16, loop=None, *args, **kwargs): super().__init__(loop=loop, *args, **kwargs) self._loop = loop or asyncio.get_event_loop() self._throttle = Throttle(rate_limit, self._loop) self._stream = stream self._b_limit = buffer_limit * 2 self._b_limit_reached = False self._check_handle = None self._throttling = True # resume transport reading if stream.paused: try: stream.transport.resume_reading() except AttributeError: pass def __del__(self): if self._check_handle is not None: self._check_handle.cancel() @property def rate_limit(self): """ :returns: the current rate limit :rtype: int .. versionadded:: 0.1.2 """ return self._throttle.limit @property def throttling(self): """ :returns: wether the connection is being throttled :rtype: bool .. versionadded:: 0.1.2 """ return self._throttling def limit_rate(self, limit):
[docs] """Sets the rate limit of this response :param limit: the limit in bytes to read/write per second .. versionadded:: 0.1.1 """ self._throttle.limit = limit self._throttling = True def unlimit_rate(self):
[docs] """Unlimits the rate of this response .. versionadded:: 0.1.1 """ self._throttling = False if len(self._buffer) < self._b_limit: self._try_resume() def _try_pause(self):
"""Pauses the transport if not already paused""" if self._stream.paused: return try: self._stream.transport.pause_reading() except AttributeError: pass else: self._stream.paused = True LOGGER.debug("[reader] paused") def _try_resume(self): """Resumed the transport if paused""" if not self._stream.paused: return try: self._stream.transport.resume_reading() except AttributeError: pass else: self._stream.paused = False LOGGER.debug("[reader] resumed") def feed_data(self, data, _=0):
[docs] """Feeds data into the internal buffer""" LOGGER.debug("[reader] got fed %d bytes", len(data)) super().feed_data(data) self._throttle.reset_io() self._throttle.add_io(len(data)) self._check_limits() def _check_callback(self):
"""Tries to resume the transport after the rate limit is reached""" self._check_handle = None self._try_resume() def _schedule_resume(self): """resumes the transport as soon as the rate limit is reached""" # resume as soon as the target rate is reached if self._check_handle is not None: self._check_handle.cancel() pause_time = self._throttle.time_left() LOGGER.debug("[reader] resuming in %.3f seconds", pause_time) self._check_handle = self._loop.call_later( pause_time, self._check_callback) @asyncio.coroutine def _check_buffer_limit(self): """Controls the size of the internal buffer""" buf_size = len(self._buffer) if self._stream.paused: try: within_limit = self._throttle.within_limit() except RuntimeError: # not enough time has passed since feed_data() yield from asyncio.sleep(.001) yield from self._check_buffer_limit() return resume = ( buf_size < self._b_limit and ( not self._throttling or within_limit)) if resume: LOGGER.debug("[reader] resuming throttling") self._try_resume() self._b_limit_reached = False else: self._schedule_resume() else: # read() only reduces buffer size, # feed_data() pauses on full buffer, # so this can only be reached on # non-full buffer assert buf_size <= self._b_limit def _check_limits(self): """Controls rate and buffer size by pausing/resuming the transport""" if self._check_handle is not None: self._check_handle.cancel() self._check_handle = None buf_size = len(self._buffer) if not self._throttling: # only watch the buffer limit if ( self._stream.paused and buf_size < self._b_limit): LOGGER.debug("[reader] resuming unthrottling") self._try_resume() return self._try_pause() # watch the buffer limit if buf_size >= self._b_limit: LOGGER.debug("[reader] byte limit reached, not resuming") self._b_limit_reached = True return self._schedule_resume() @asyncio.coroutine def read(self, byte_count=-1):
[docs] """Reads at most the requested number of bytes from the internal buffer :param int byte_count: the number of bytes to read :returns: the data :rtype: bytes """ LOGGER.debug("[reader] reading %d bytes", byte_count) data = yield from super().read(byte_count) yield from self._check_buffer_limit() return data @asyncio.coroutine
def readline(self):
[docs] """Reads bytes from the internal buffer until ``\\n`` is found :returns: the data :rtype: bytes """ LOGGER.debug("[reader] reading line") data = yield from super().readline() yield from self._check_buffer_limit() return data @asyncio.coroutine
def readany(self):
[docs] """Reads the bytes next received from the internal buffer :returns: the data :rtype: bytes """ LOGGER.debug("[reader] reading anything") data = yield from super().readany() yield from self._check_buffer_limit() return data @asyncio.coroutine
def readexactly(self, byte_count):
[docs] """Reads the requested number of bytes from the internal buffer This raises :class:`asyncio.IncompleteReadError` if the stream ended before enough bytes were received :param int byte_count: the number of bytes to read :returns: the data :rtype: bytes """ LOGGER.debug("[reader] reading exactly %d bytes", byte_count) data = yield from super().readexactly(byte_count) yield from self._check_buffer_limit() return data def limit_rate(limit):
[docs] """Limits the rate of all subsequent aiohttp requests :param limit: the limit in bytes to read/write per second .. versionadded:: 0.1.1 """ partial = functools.partial( ThrottledStreamReader, rate_limit=limit) aiohttp.client_reqrep.ClientResponse.flow_control_class = partial def unlimit_rate():
[docs] """Unlimits the rate of all subsequent aiohttp requests .. versionadded:: 0.1.1 """ aiohttp.client_reqrep.ClientResponse.flow_control_class = ( aiohttp.streams.FlowControlStreamReader)