#!/usr/bin/env python
# -*- coding: utf-8 -*-
uploadform.py - A simple/minimalist yet flexible helper module to construct a POSTable mime
message that is similar to a message generated by a browser when you submit an html form.
Supports file uploads and even "incremental uploads" (by yielding) if you don't
want to read the whole uploadable file into memory before POSTing it to the HTTP server.
Provided "as is" - use it at your own risk and don't complain if it cuts down your limbs...
Run this module to execute the test/example code.
import uuid, urllib
__author__ = 'István Pásztor'
__all__ = ['UploadForm']
class UploadForm:
def __init__(self, boundary=None):
self.fields = []
def set_boundary(self, boundary):
self.boundary = urllib.quote(boundary) if boundary else '__--__{%s}__--__' % uuid.uuid4()
def add_field(self, name, value, **params):
'''see _Field.__init__() for the description of the parameters.'''
field = _Field(name, value, **params)
return field
def get_request_headers(self, calculate_content_length=True):
assert self.fields
headers = {'Content-Type': 'multipart/form-data; boundary='+self.boundary}
if not calculate_content_length:
return headers
content_length = self.get_size()
headers['Content-Length'] = str(content_length)
return (headers, content_length)
def __iter__(self):
assert self.fields
boundary = '--' + self.boundary + '\r\n'
for field in self.fields:
yield boundary
for chunk in field:
yield chunk
yield '--'
yield self.boundary
yield '--\r\n'
def __str__(self):
return ''.join(s for s in self)
def dump(self, f):
for chunk in self:
def get_size(self):
return (2+len(self.boundary)+2) * (len(self.fields)+1) + 2 + sum(field.get_size() for field in self.fields)
class _Header:
def __init__(self, _header_name, _header_value, **params):
self.name = _header_name
self.value = _header_value
self.params = params
def __iter__(self):
yield _stringify(self.name)
yield ': '
yield _stringify(self.value)
for name, value in self.params.iteritems():
yield '; '
yield _stringify(name)
yield '="'
yield _stringify(value).replace('\\', '\\\\').replace('"', '\\"')
yield '"'
yield '\r\n'
def get_size(self):
def _param_size(name, value):
value = _stringify(value)
return 5 + len(_stringify(name)) + len(value) + value.count('\\') + value.count('"')
return 4 + len(_stringify(self.name)) + len(_stringify(self.value)) + sum(_param_size(name, value) for name,value in self.params.iteritems())
class _Field:
BUFSIZE = 16*1024 # BUFSIZE used when yielding data from file objects
def __init__(self, name, value, **params):
@param params: Contains additional key-value pairs besides the "name" parameter for the Content-Disposition header. Example: filename='x.zip'
@param value: This parameter can be a string, a unicode object, or a (file-object, size) tuple. In case of unicode object the string
is converted to utf-8 before converting to mime. In case of a file-object we read the data starting from the current file pointer.
params['name'] = name
self.headers = [_Header('Content-Disposition', 'form-data', **params)]
assert isinstance(value, (str, unicode, tuple))
self.value = value
def add_header(self, name, value, **params):
''' You can add additional mime headers, for example a "Content-Type" header with "text/plain" value with optional parameters like charset="UTF-8"'''
self.headers.append(_Header(name, value, **params))
def __iter__(self):
for header in self.headers:
for chunk in header:
yield chunk
yield '\r\n'
if isinstance(self.value, str):
yield self.value
elif isinstance(self.value, unicode):
yield self.value.encode('utf-8')
bytes_left = self.value[1]
while bytes_left > 0:
bytes_to_read = min(self.BUFSIZE, bytes_left)
data = self.value[0].read(bytes_to_read)
if not data:
raise Exception('The specified file object doesn\'t contain enough data!')
yield data
bytes_left -= len(data)
yield '\r\n'
def __str__(self):
return ''.join(s for s in self)
def get_size(self):
size = sum(header.get_size() for header in self.headers) + 2
if isinstance(self.value, str):
size += len(self.value)
elif isinstance(self.value, unicode):
size += len(self.value.encode('utf-8'))
size += self.value[1]
size += 2 # newline
return size
def _stringify(s):
return s.encode('utf-8') if isinstance(s, unicode) else s
if __name__ == '__main__':
'''Test/example code.'''
import os
filename = 'x.zip'
filepath = os.path.join('j:\\', filename)
with open(filepath, 'rb') as f:
form = UploadForm()
# Adding a field by setting its value by reading from file like object...
form.add_field('file', (f, os.path.getsize(filepath)), filename=filename)
field = form.add_field('file_from_mem', u'This is the content of Pistike\'s text file.\n', filename='memfile.txt')
# Adding headers is optional but we do it this time just to demonstrate it...
field.add_header('Content-Type', 'text/plain', charset='UTF-8')
field.add_header('MyHeader', 'my_header_value', quote_test_param='m\\y"param"2')
form.add_field('comment', u'ÃrvÃztűrÅ‘ tükörfúrógép') # simulating a type="text" input field on the form with name="comment"
form.add_field('my_checkbox', 'on') # simulating a type="checkbox" input field on the form with name="my_checkbox"
headers, content_length = form.get_request_headers()
content = str(form)
print 'Headers: %s' % (headers,)
print 'Content-Length: %s' % len(content)
#print content
assert len(content) == content_length
import httplib
conn = httplib.HTTPConnection('localhost')
conn.request('POST', '/dir2/upload', content, headers=headers)
resp = conn.getresponse()
print resp.status
print resp.read()