Welcome, guest | Sign In | My Account | Store | Cart

Example read_body_stream() usage:

with open(http_file_path, 'rb') as fh:
    print(b''.join(httputil.read_body_stream(
        fh, chunked=True, compression=httputil.GZIP))
Python, 185 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Utility functions to deal with HTTP stream: dechunking and decompressing
body etc.
"""

__author__ = 'vovanec@gmail.com'


import bz2
import functools
import types
import zlib


CHUNK_SIZE = 1024 * 16

GZIP = 'gzip'
DEFLATE = 'deflate'
BZIP2 = 'bzip2'
SUPPORTED_COMPRESSIONS = {GZIP, DEFLATE, BZIP2}

DECOMPRESSOR_FACTORIES = {
    DEFLATE: functools.partial(zlib.decompressobj, -zlib.MAX_WBITS),
    GZIP: functools.partial(zlib.decompressobj, 16 + zlib.MAX_WBITS),
    BZIP2: bz2.BZ2Decompressor
}


class BodyStreamError(Exception):

    """Exception of this class is raised when HTTP stream could not be read.
    """

    pass


class DechunkError(BodyStreamError):

    """Raised when could not de-chunk stream.
    """

    pass


class DecompressError(BodyStreamError):

    """Raised when could not decompress stream.
    """

    pass


def read_until(stream, delimiter, max_bytes=16):
    """Read until we have found the given delimiter.
    :param file stream: readable file-like object.
    :param bytes delimiter: delimiter string.
    :param int max_bytes: maximum bytes to read.
    :rtype: bytes|None
    """

    buf = bytearray()
    delim_len = len(delimiter)

    while len(buf) < max_bytes:
        c = stream.read(1)

        if not c:
            break

        buf += c
        if buf[-delim_len:] == delimiter:
            return bytes(buf[:-delim_len])


def dechunk(stream):
    """De-chunk HTTP body stream.
    :param file stream: readable file-like object.
    :rtype: __generator[bytes]
    :raise: DechunkError
    """

    # TODO(vovan): Add support for chunk extensions:
    # TODO(vovan): http://tools.ietf.org/html/rfc2616#section-3.6.1

    while True:
        chunk_len = read_until(stream, b'\r\n')

        if chunk_len is None:
            raise DechunkError(
                'Could not extract chunk size: unexpected end of data.')

        try:
            chunk_len = int(chunk_len.strip(), 16)
        except (ValueError, TypeError) as err:
            raise DechunkError('Could not parse chunk size: %s' % (err,))

        if chunk_len == 0:
            break

        bytes_to_read = chunk_len
        while bytes_to_read:
            chunk = stream.read(bytes_to_read)
            bytes_to_read -= len(chunk)
            yield chunk

        # chunk ends with \r\n
        crlf = stream.read(2)
        if crlf != b'\r\n':
            raise DechunkError('No CR+LF at the end of chunk!')


def to_chunks(stream_or_generator):
    """This generator function receives file-like or generator as input
    and returns generator.
    :param file|__generator[bytes] stream_or_generator: readable stream or
           generator.
    :rtype: __generator[bytes]
    :raise: TypeError
    """

    if isinstance(stream_or_generator, types.GeneratorType):
        yield from stream_or_generator
    elif hasattr(stream_or_generator, 'read'):
        while True:
            chunk = stream_or_generator.read(CHUNK_SIZE)
            if not chunk:
                break  # no more data

            yield chunk

    else:
        raise TypeError('Input must be either readable or generator.')


def decompress(chunks, compression):
    """Decompress
    :param __generator[bytes] chunks: compressed body chunks.
    :param str compression: compression constant.
    :rtype: __generator[bytes]
    :return: decompressed chunks.
    :raise: TypeError, DecompressError
    """

    if compression not in SUPPORTED_COMPRESSIONS:
        raise TypeError('Unsupported compression type: %s' % (compression,))

    try:
        de_compressor = DECOMPRESSOR_FACTORIES[compression]()

        for chunk in chunks:
            try:
                yield de_compressor.decompress(chunk)
            except OSError as err:
                # BZ2Decompressor: invalid data stream
                raise DecompressError(err) from None

        # BZ2Decompressor does not support flush() interface.
        if hasattr(de_compressor, 'flush'):
            yield de_compressor.flush()

    except zlib.error as err:
        raise DecompressError(err) from None


def read_body_stream(stream, chunked=False, compression=None):
    """Read HTTP body stream, yielding blocks of bytes. De-chunk and
    de-compress data if needed.
    :param file stream: readable stream.
    :param bool chunked: whether stream is chunked.
    :param str|None compression: compression type is stream is
           compressed, otherwise None.
    :rtype: __generator[bytes]
    :raise: TypeError, BodyStreamError
    """

    if not (chunked or compression):
        return to_chunks(stream)

    generator = stream
    if chunked:
        generator = dechunk(generator)

    if compression:
        generator = decompress(to_chunks(generator), compression)

    return generator

Please refer to https://github.com/vovanec/httputil for more stuff to deal with HTTP data

Created by Vovan on Tue, 30 Jun 2015 (MIT)
Python recipes (4591)
Vovan's recipes (3)

Required Modules

  • (none specified)

Other Information and Tasks