Welcome, guest | Sign In | My Account | Store | Cart
"""Utility functions to deal with HTTP stream: dechunking and decompressing
body etc.
"""


__author__
= 'vovanec@gmail.com'


import bz2
import functools
import types
import zlib


CHUNK_SIZE
= 1024 * 16

GZIP
= 'gzip'
DEFLATE
= 'deflate'
BZIP2
= 'bzip2'
SUPPORTED_COMPRESSIONS
= {GZIP, DEFLATE, BZIP2}

DECOMPRESSOR_FACTORIES
= {
    DEFLATE
: functools.partial(zlib.decompressobj, -zlib.MAX_WBITS),
    GZIP
: functools.partial(zlib.decompressobj, 16 + zlib.MAX_WBITS),
    BZIP2
: bz2.BZ2Decompressor
}


class BodyStreamError(Exception):

   
"""Exception of this class is raised when HTTP stream could not be read.
    """


   
pass


class DechunkError(BodyStreamError):

   
"""Raised when could not de-chunk stream.
    """


   
pass


class DecompressError(BodyStreamError):

   
"""Raised when could not decompress stream.
    """


   
pass


def read_until(stream, delimiter, max_bytes=16):
   
"""Read until we have found the given delimiter.
    :param file stream: readable file-like object.
    :param bytes delimiter: delimiter string.
    :param int max_bytes: maximum bytes to read.
    :rtype: bytes|None
    """


    buf
= bytearray()
    delim_len
= len(delimiter)

   
while len(buf) < max_bytes:
        c
= stream.read(1)

       
if not c:
           
break

        buf
+= c
       
if buf[-delim_len:] == delimiter:
           
return bytes(buf[:-delim_len])


def dechunk(stream):
   
"""De-chunk HTTP body stream.
    :param file stream: readable file-like object.
    :rtype: __generator[bytes]
    :raise: DechunkError
    """


   
# TODO(vovan): Add support for chunk extensions:
   
# TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1

   
while True:
        chunk_len
= read_until(stream, b'\r\n')

       
if chunk_len is None:
           
raise DechunkError(
               
'Could not extract chunk size: unexpected end of data.')

       
try:
            chunk_len
= int(chunk_len.strip(), 16)
       
except (ValueError, TypeError) as err:
           
raise DechunkError('Could not parse chunk size: %s' % (err,))

       
if chunk_len == 0:
           
break

        bytes_to_read
= chunk_len
       
while bytes_to_read:
            chunk
= stream.read(bytes_to_read)
            bytes_to_read
-= len(chunk)
           
yield chunk

       
# chunk ends with \r\n
        crlf
= stream.read(2)
       
if crlf != b'\r\n':
           
raise DechunkError('No CR+LF at the end of chunk!')


def to_chunks(stream_or_generator):
   
"""This generator function receives file-like or generator as input
    and returns generator.
    :param file|__generator[bytes] stream_or_generator: readable stream or
           generator.
    :rtype: __generator[bytes]
    :raise: TypeError
    """


   
if isinstance(stream_or_generator, types.GeneratorType):
       
yield from stream_or_generator
   
elif hasattr(stream_or_generator, 'read'):
       
while True:
            chunk
= stream_or_generator.read(CHUNK_SIZE)
           
if not chunk:
               
break  # no more data

           
yield chunk

   
else:
       
raise TypeError('Input must be either readable or generator.')


def decompress(chunks, compression):
   
"""Decompress
    :param __generator[bytes] chunks: compressed body chunks.
    :param str compression: compression constant.
    :rtype: __generator[bytes]
    :return: decompressed chunks.
    :raise: TypeError, DecompressError
    """


   
if compression not in SUPPORTED_COMPRESSIONS:
       
raise TypeError('Unsupported compression type: %s' % (compression,))

   
try:
        de_compressor
= DECOMPRESSOR_FACTORIES[compression]()

       
for chunk in chunks:
           
try:
               
yield de_compressor.decompress(chunk)
           
except OSError as err:
               
# BZ2Decompressor: invalid data stream
               
raise DecompressError(err) from None

       
# BZ2Decompressor does not support flush() interface.
       
if hasattr(de_compressor, 'flush'):
           
yield de_compressor.flush()

   
except zlib.error as err:
       
raise DecompressError(err) from None


def read_body_stream(stream, chunked=False, compression=None):
   
"""Read HTTP body stream, yielding blocks of bytes. De-chunk and
    de-compress data if needed.
    :param file stream: readable stream.
    :param bool chunked: whether stream is chunked.
    :param str|None compression: compression type is stream is
           compressed, otherwise None.
    :rtype: __generator[bytes]
    :raise: TypeError, BodyStreamError
    """


   
if not (chunked or compression):
       
return to_chunks(stream)

    generator
= stream
   
if chunked:
        generator
= dechunk(generator)

   
if compression:
        generator
= decompress(to_chunks(generator), compression)

   
return generator

Diff to Previous Revision

--- revision 1 2015-06-30 03:29:39
+++ revision 2 2015-06-30 03:31:31
@@ -78,8 +78,8 @@
     
:raise: DechunkError
     
"""
 
-    # TODO(vkuznet): Add support for chunk extensions:
-    # TODO(vkuznet): http://tools.ietf.org/html/rfc2616#section-3.6.1
+    # TODO(vovan): Add support for chunk extensions:
+    # TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1
 
     while True:
         chunk_len = read_until(stream, b'\r\n')

History