"""Utility functions to deal with HTTP stream: dechunking and decompressing body etc. """ __author__ = 'vovanec@gmail.com' import bz2 import functools import types import zlib CHUNK_SIZE = 1024 * 16 GZIP = 'gzip' DEFLATE = 'deflate' BZIP2 = 'bzip2' SUPPORTED_COMPRESSIONS = {GZIP, DEFLATE, BZIP2} DECOMPRESSOR_FACTORIES = { DEFLATE: functools.partial(zlib.decompressobj, -zlib.MAX_WBITS), GZIP: functools.partial(zlib.decompressobj, 16 + zlib.MAX_WBITS), BZIP2: bz2.BZ2Decompressor } class BodyStreamError(Exception): """Exception of this class is raised when HTTP stream could not be read. """ pass class DechunkError(BodyStreamError): """Raised when could not de-chunk stream. """ pass class DecompressError(BodyStreamError): """Raised when could not decompress stream. """ pass def read_until(stream, delimiter, max_bytes=16): """Read until we have found the given delimiter. :param file stream: readable file-like object. :param bytes delimiter: delimiter string. :param int max_bytes: maximum bytes to read. :rtype: bytes|None """ buf = bytearray() delim_len = len(delimiter) while len(buf) < max_bytes: c = stream.read(1) if not c: break buf += c if buf[-delim_len:] == delimiter: return bytes(buf[:-delim_len]) def dechunk(stream): """De-chunk HTTP body stream. :param file stream: readable file-like object. :rtype: __generator[bytes] :raise: DechunkError """ # TODO(vovan): Add support for chunk extensions: # TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1 while True: chunk_len = read_until(stream, b'\r\n') if chunk_len is None: raise DechunkError( 'Could not extract chunk size: unexpected end of data.') try: chunk_len = int(chunk_len.strip(), 16) except (ValueError, TypeError) as err: raise DechunkError('Could not parse chunk size: %s' % (err,)) if chunk_len == 0: break bytes_to_read = chunk_len while bytes_to_read: chunk = stream.read(bytes_to_read) bytes_to_read -= len(chunk) yield chunk # chunk ends with \r\n crlf = stream.read(2) if crlf != b'\r\n': raise DechunkError('No CR+LF at the end of chunk!') def to_chunks(stream_or_generator): """This generator function receives file-like or generator as input and returns generator. :param file|__generator[bytes] stream_or_generator: readable stream or generator. :rtype: __generator[bytes] :raise: TypeError """ if isinstance(stream_or_generator, types.GeneratorType): yield from stream_or_generator elif hasattr(stream_or_generator, 'read'): while True: chunk = stream_or_generator.read(CHUNK_SIZE) if not chunk: break # no more data yield chunk else: raise TypeError('Input must be either readable or generator.') def decompress(chunks, compression): """Decompress :param __generator[bytes] chunks: compressed body chunks. :param str compression: compression constant. :rtype: __generator[bytes] :return: decompressed chunks. :raise: TypeError, DecompressError """ if compression not in SUPPORTED_COMPRESSIONS: raise TypeError('Unsupported compression type: %s' % (compression,)) try: de_compressor = DECOMPRESSOR_FACTORIES[compression]() for chunk in chunks: try: yield de_compressor.decompress(chunk) except OSError as err: # BZ2Decompressor: invalid data stream raise DecompressError(err) from None # BZ2Decompressor does not support flush() interface. if hasattr(de_compressor, 'flush'): yield de_compressor.flush() except zlib.error as err: raise DecompressError(err) from None def read_body_stream(stream, chunked=False, compression=None): """Read HTTP body stream, yielding blocks of bytes. De-chunk and de-compress data if needed. :param file stream: readable stream. :param bool chunked: whether stream is chunked. :param str|None compression: compression type is stream is compressed, otherwise None. :rtype: __generator[bytes] :raise: TypeError, BodyStreamError """ if not (chunked or compression): return to_chunks(stream) generator = stream if chunked: generator = dechunk(generator) if compression: generator = decompress(to_chunks(generator), compression) return generator