Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
uploadform.py - A simple/minimalist yet flexible helper module to construct a POSTable mime
message that is similar to a message generated by a browser when you submit an html form.
Supports file uploads and even "incremental uploads" (by yielding) if you don't
want to read the whole uploadable file into memory before POSTing it to the HTTP server.
Provided "as is" - use it at your own risk and don't complain if it cuts down your limbs...
Run this module to execute the test/example code.
'''
import uuid, urllib

__author__ = 'István Pásztor'
__all__ = ['UploadForm']

class UploadForm:
	def __init__(self, boundary=None):
		self.set_boundary(boundary)
		self.fields = []
	def set_boundary(self, boundary):
		self.boundary = urllib.quote(boundary) if boundary else '__--__{%s}__--__' % uuid.uuid4()
	def add_field(self, name, value, **params):
		'''see _Field.__init__() for the description of the parameters.'''
		field = _Field(name, value, **params)
		self.fields.append(field)
		return field
	def get_request_headers(self, calculate_content_length=True):
		assert self.fields
		headers = {'Content-Type': 'multipart/form-data; boundary='+self.boundary}
		if not calculate_content_length:
			return headers
		content_length = self.get_size()
		headers['Content-Length'] = str(content_length)
		return (headers, content_length)
	def __iter__(self):
		assert self.fields
		boundary = '--' + self.boundary + '\r\n'
		for field in self.fields:
			yield boundary
			for chunk in field:
				yield chunk
		yield '--'
		yield self.boundary
		yield '--\r\n'
	def __str__(self):
		return ''.join(s for s in self)
	def dump(self, f):
		for chunk in self:
			f.write(chunk)
	def get_size(self):
		return (2+len(self.boundary)+2) * (len(self.fields)+1) + 2 + sum(field.get_size() for field in self.fields)

class _Header:
	def __init__(self, _header_name, _header_value, **params):
		self.name = _header_name
		self.value = _header_value
		self.params = params
	def __iter__(self):
		yield _stringify(self.name)
		yield ': '
		yield _stringify(self.value)
		for name, value in self.params.iteritems():
			yield '; '
			yield _stringify(name)
			yield '="'
			yield _stringify(value).replace('\\', '\\\\').replace('"', '\\"')
			yield '"'
		yield '\r\n'
	def get_size(self):
		def _param_size(name, value):
			value = _stringify(value)
			return 5 + len(_stringify(name)) + len(value) + value.count('\\') + value.count('"')
		return 4 + len(_stringify(self.name)) + len(_stringify(self.value)) + sum(_param_size(name, value) for name,value in self.params.iteritems())

class _Field:
	BUFSIZE = 16*1024	# BUFSIZE used when yielding data from file objects

	def __init__(self, name, value, **params):
		'''
		@param params: Contains additional key-value pairs besides the "name" parameter for the Content-Disposition header. Example: filename='x.zip'
		@param value: This parameter can be a string, a unicode object, or a (file-object, size) tuple. In case of unicode object the string
				is converted to utf-8 before converting to mime. In case of a file-object we read the data starting from the current file pointer.
		'''
		params['name'] = name
		self.headers = [_Header('Content-Disposition', 'form-data', **params)]
		assert isinstance(value, (str, unicode, tuple))
		self.value = value
	def add_header(self, name, value, **params):
		'''	You can add additional mime headers, for example a "Content-Type" header with "text/plain" value with optional parameters like charset="UTF-8"'''
		self.headers.append(_Header(name, value, **params))
	def __iter__(self):
		for header in self.headers:
			for chunk in header:
				yield chunk
		yield '\r\n'

		if isinstance(self.value, str):
			yield self.value
		elif isinstance(self.value, unicode):
			yield self.value.encode('utf-8')
		else:
			bytes_left = self.value[1]
			while bytes_left > 0:
				bytes_to_read = min(self.BUFSIZE, bytes_left)
				data = self.value[0].read(bytes_to_read)
				if not data:
					raise Exception('The specified file object doesn\'t contain enough data!')
				yield data
				bytes_left -= len(data)
		yield '\r\n'
	def __str__(self):
		return ''.join(s for s in self)
	def get_size(self):
		size = sum(header.get_size() for header in self.headers) + 2

		if isinstance(self.value, str):
			size += len(self.value)
		elif isinstance(self.value, unicode):
			size += len(self.value.encode('utf-8'))
		else:
			size += self.value[1]

		size += 2	# newline
		return size

def _stringify(s):
	return s.encode('utf-8') if isinstance(s, unicode) else s

if __name__ == '__main__':
	'''Test/example code.'''
	import os

	filename = 'x.zip'
	filepath = os.path.join('j:\\', filename)
	with open(filepath, 'rb') as f:
		form = UploadForm()
		# Adding a field by setting its value by reading from file like object...
		form.add_field('file', (f, os.path.getsize(filepath)), filename=filename)
		field = form.add_field('file_from_mem', u'This is the content of Pistike\'s text file.\n', filename='memfile.txt')
		# Adding headers is optional but we do it this time just to demonstrate it...
		field.add_header('Content-Type', 'text/plain', charset='UTF-8')
		field.add_header('MyHeader', 'my_header_value', quote_test_param='m\\y"param"2')
		form.add_field('comment', u'Árvíztűrő tükörfúrógép')	# simulating a type="text" input field on the form with name="comment"
		form.add_field('my_checkbox', 'on')						# simulating a type="checkbox" input field on the form with name="my_checkbox"
		headers, content_length = form.get_request_headers()
		content = str(form)

	print 'Headers: %s' % (headers,)
	print 'Content-Length: %s' % len(content)
	#print content

	assert len(content) == content_length

	import httplib

	conn = httplib.HTTPConnection('localhost')
	try:
		conn.request('POST', '/dir2/upload', content, headers=headers)
		resp = conn.getresponse()
		print resp.status
		print resp.read()
	finally:
		conn.close()

Diff to Previous Revision

--- revision 4 2014-03-08 14:34:49
+++ revision 5 2014-03-08 17:34:38
@@ -59,10 +59,17 @@
 		yield _stringify(self.name)
 		yield ': '
 		yield _stringify(self.value)
-		for chunk in _params_to_string(self.params):
-			yield chunk
+		for name, value in self.params.iteritems():
+			yield '; '
+			yield _stringify(name)
+			yield '="'
+			yield _stringify(value).replace('\\', '\\\\').replace('"', '\\"')
+			yield '"'
 		yield '\r\n'
 	def get_size(self):
+		def _param_size(name, value):
+			value = _stringify(value)
+			return 5 + len(_stringify(name)) + len(value) + value.count('\\') + value.count('"')
 		return 4 + len(_stringify(self.name)) + len(_stringify(self.value)) + sum(_param_size(name, value) for name,value in self.params.iteritems())
 
 class _Field:
@@ -116,25 +123,8 @@
 		size += 2	# newline
 		return size
 
-def _param_size(name, value):
-	value = _stringify(value)
-	return 5 + len(_stringify(name)) + len(value) + value.count('\\') + value.count('"')
-
-def _params_to_string(params):
-	for name, value in params.iteritems():
-		for chunk in _param_to_string(name, value):
-			yield chunk
-
-def _param_to_string(name, value):
-	yield '; '
-	yield _stringify(name)
-	yield '="'
-	yield _stringify(value).replace('\\', '\\\\').replace('"', '\\"')
-	yield '"'
-
 def _stringify(s):
 	return s.encode('utf-8') if isinstance(s, unicode) else s
-
 
 if __name__ == '__main__':
 	'''Test/example code.'''

History