Welcome, guest | Sign In | My Account | Store | Cart
"""Utility functions to deal with HTTP stream: dechunking and decompressing
body etc.
"""

__author__ = 'vovanec@gmail.com'


import bz2
import functools
import types
import zlib


CHUNK_SIZE = 1024 * 16

GZIP = 'gzip'
DEFLATE = 'deflate'
BZIP2 = 'bzip2'
SUPPORTED_COMPRESSIONS = {GZIP, DEFLATE, BZIP2}

DECOMPRESSOR_FACTORIES = {
    DEFLATE: functools.partial(zlib.decompressobj, -zlib.MAX_WBITS),
    GZIP: functools.partial(zlib.decompressobj, 16 + zlib.MAX_WBITS),
    BZIP2: bz2.BZ2Decompressor
}


class BodyStreamError(Exception):

    """Exception of this class is raised when HTTP stream could not be read.
    """

    pass


class DechunkError(BodyStreamError):

    """Raised when could not de-chunk stream.
    """

    pass


class DecompressError(BodyStreamError):

    """Raised when could not decompress stream.
    """

    pass


def read_until(stream, delimiter, max_bytes=16):
    """Read until we have found the given delimiter.
    :param file stream: readable file-like object.
    :param bytes delimiter: delimiter string.
    :param int max_bytes: maximum bytes to read.
    :rtype: bytes|None
    """

    buf = bytearray()
    delim_len = len(delimiter)

    while len(buf) < max_bytes:
        c = stream.read(1)

        if not c:
            break

        buf += c
        if buf[-delim_len:] == delimiter:
            return bytes(buf[:-delim_len])


def dechunk(stream):
    """De-chunk HTTP body stream.
    :param file stream: readable file-like object.
    :rtype: __generator[bytes]
    :raise: DechunkError
    """

    # TODO(vovan): Add support for chunk extensions:
    # TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1

    while True:
        chunk_len = read_until(stream, b'\r\n')

        if chunk_len is None:
            raise DechunkError(
                'Could not extract chunk size: unexpected end of data.')

        try:
            chunk_len = int(chunk_len.strip(), 16)
        except (ValueError, TypeError) as err:
            raise DechunkError('Could not parse chunk size: %s' % (err,))

        if chunk_len == 0:
            break

        bytes_to_read = chunk_len
        while bytes_to_read:
            chunk = stream.read(bytes_to_read)
            bytes_to_read -= len(chunk)
            yield chunk

        # chunk ends with \r\n
        crlf = stream.read(2)
        if crlf != b'\r\n':
            raise DechunkError('No CR+LF at the end of chunk!')


def to_chunks(stream_or_generator):
    """This generator function receives file-like or generator as input
    and returns generator.
    :param file|__generator[bytes] stream_or_generator: readable stream or
           generator.
    :rtype: __generator[bytes]
    :raise: TypeError
    """

    if isinstance(stream_or_generator, types.GeneratorType):
        yield from stream_or_generator
    elif hasattr(stream_or_generator, 'read'):
        while True:
            chunk = stream_or_generator.read(CHUNK_SIZE)
            if not chunk:
                break  # no more data

            yield chunk

    else:
        raise TypeError('Input must be either readable or generator.')


def decompress(chunks, compression):
    """Decompress
    :param __generator[bytes] chunks: compressed body chunks.
    :param str compression: compression constant.
    :rtype: __generator[bytes]
    :return: decompressed chunks.
    :raise: TypeError, DecompressError
    """

    if compression not in SUPPORTED_COMPRESSIONS:
        raise TypeError('Unsupported compression type: %s' % (compression,))

    try:
        de_compressor = DECOMPRESSOR_FACTORIES[compression]()

        for chunk in chunks:
            try:
                yield de_compressor.decompress(chunk)
            except OSError as err:
                # BZ2Decompressor: invalid data stream
                raise DecompressError(err) from None

        # BZ2Decompressor does not support flush() interface.
        if hasattr(de_compressor, 'flush'):
            yield de_compressor.flush()

    except zlib.error as err:
        raise DecompressError(err) from None


def read_body_stream(stream, chunked=False, compression=None):
    """Read HTTP body stream, yielding blocks of bytes. De-chunk and
    de-compress data if needed.
    :param file stream: readable stream.
    :param bool chunked: whether stream is chunked.
    :param str|None compression: compression type is stream is
           compressed, otherwise None.
    :rtype: __generator[bytes]
    :raise: TypeError, BodyStreamError
    """

    if not (chunked or compression):
        return to_chunks(stream)

    generator = stream
    if chunked:
        generator = dechunk(generator)

    if compression:
        generator = decompress(to_chunks(generator), compression)

    return generator

Diff to Previous Revision

--- revision 1 2015-06-30 03:29:39
+++ revision 2 2015-06-30 03:31:31
@@ -78,8 +78,8 @@
     :raise: DechunkError
     """
 
-    # TODO(vkuznet): Add support for chunk extensions:
-    # TODO(vkuznet): http://tools.ietf.org/html/rfc2616#section-3.6.1
+    # TODO(vovan): Add support for chunk extensions:
+    # TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1
 
     while True:
         chunk_len = read_until(stream, b'\r\n')

History