#!/usr/bin/env python # -*- coding: utf-8 -*- ''' uploadform.py - A simple/minimalist yet flexible helper module to construct a POSTable mime message that is similar to a message generated by a browser when you submit an html form. Supports file uploads and even "incremental uploads" (by yielding) if you don't want to read the whole uploadable file into memory before POSTing it to the HTTP server. Provided "as is" - use it at your own risk and don't complain if it cuts down your limbs... Run this module to execute the test/example code. ''' import uuid, urllib __author__ = 'István Pásztor' __all__ = ['UploadForm'] class UploadForm: def __init__(self, boundary=None): self.set_boundary(boundary) self.fields = [] def set_boundary(self, boundary): self.boundary = urllib.quote(boundary) if boundary else '__--__{%s}__--__' % uuid.uuid4() def add_field(self, name, value, **params): '''see _Field.__init__() for the description of the parameters.''' field = _Field(name, value, **params) self.fields.append(field) return field def get_request_headers(self, calculate_content_length=True): assert self.fields headers = {'Content-Type': 'multipart/form-data; boundary='+self.boundary} if not calculate_content_length: return headers content_length = self.get_size() headers['Content-Length'] = str(content_length) return (headers, content_length) def __iter__(self): assert self.fields boundary = '--' + self.boundary + '\r\n' for field in self.fields: yield boundary for chunk in field: yield chunk yield '--' yield self.boundary yield '--\r\n' def __str__(self): return ''.join(s for s in self) def dump(self, f): for chunk in self: f.write(chunk) def get_size(self): return (2+len(self.boundary)+2) * (len(self.fields)+1) + 2 + sum(field.get_size() for field in self.fields) class _Header: def __init__(self, _header_name, _header_value, **params): self.name = _header_name self.value = _header_value self.params = params def __iter__(self): yield _stringify(self.name) yield ': ' yield _stringify(self.value) for name, value in self.params.iteritems(): yield '; ' yield _stringify(name) yield '="' yield _stringify(value).replace('\\', '\\\\').replace('"', '\\"') yield '"' yield '\r\n' def get_size(self): def _param_size(name, value): value = _stringify(value) return 5 + len(_stringify(name)) + len(value) + value.count('\\') + value.count('"') return 4 + len(_stringify(self.name)) + len(_stringify(self.value)) + sum(_param_size(name, value) for name,value in self.params.iteritems()) class _Field: BUFSIZE = 16*1024 # BUFSIZE used when yielding data from file objects def __init__(self, name, value, **params): ''' @param params: Contains additional key-value pairs besides the "name" parameter for the Content-Disposition header. Example: filename='x.zip' @param value: This parameter can be a string, a unicode object, or a (file-object, size) tuple. In case of unicode object the string is converted to utf-8 before converting to mime. In case of a file-object we read the data starting from the current file pointer. ''' params['name'] = name self.headers = [_Header('Content-Disposition', 'form-data', **params)] assert isinstance(value, (str, unicode, tuple)) self.value = value def add_header(self, name, value, **params): ''' You can add additional mime headers, for example a "Content-Type" header with "text/plain" value with optional parameters like charset="UTF-8"''' self.headers.append(_Header(name, value, **params)) def __iter__(self): for header in self.headers: for chunk in header: yield chunk yield '\r\n' if isinstance(self.value, str): yield self.value elif isinstance(self.value, unicode): yield self.value.encode('utf-8') else: bytes_left = self.value[1] while bytes_left > 0: bytes_to_read = min(self.BUFSIZE, bytes_left) data = self.value[0].read(bytes_to_read) if not data: raise Exception('The specified file object doesn\'t contain enough data!') yield data bytes_left -= len(data) yield '\r\n' def __str__(self): return ''.join(s for s in self) def get_size(self): size = sum(header.get_size() for header in self.headers) + 2 if isinstance(self.value, str): size += len(self.value) elif isinstance(self.value, unicode): size += len(self.value.encode('utf-8')) else: size += self.value[1] size += 2 # newline return size def _stringify(s): return s.encode('utf-8') if isinstance(s, unicode) else s if __name__ == '__main__': '''Test/example code.''' import os filename = 'x.zip' filepath = os.path.join('j:\\', filename) with open(filepath, 'rb') as f: form = UploadForm() # Adding a field by setting its value by reading from file like object... form.add_field('file', (f, os.path.getsize(filepath)), filename=filename) field = form.add_field('file_from_mem', u'This is the content of Pistike\'s text file.\n', filename='memfile.txt') # Adding headers is optional but we do it this time just to demonstrate it... field.add_header('Content-Type', 'text/plain', charset='UTF-8') field.add_header('MyHeader', 'my_header_value', quote_test_param='m\\y"param"2') form.add_field('comment', u'Árvíztűrő tükörfúrógép') # simulating a type="text" input field on the form with name="comment" form.add_field('my_checkbox', 'on') # simulating a type="checkbox" input field on the form with name="my_checkbox" headers, content_length = form.get_request_headers() content = str(form) print 'Headers: %s' % (headers,) print 'Content-Length: %s' % len(content) #print content assert len(content) == content_length import httplib conn = httplib.HTTPConnection('localhost') try: conn.request('POST', '/dir2/upload', content, headers=headers) resp = conn.getresponse() print resp.status print resp.read() finally: conn.close()