Welcome, guest | Sign In | My Account | Store | Cart

Garbs files from the web using multithreading in an attempt to enhance transfer rate.

Python, 179 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os
import urllib2
import time
import multiprocessing.dummy as multiprocessing
import string
from random import choice
import socket
from ctypes import c_int
import tempfile

import dummy
from logger import log

"Smart Downloading Module. Written by Itay Brandes."

shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

def DownloadFile(url, path, startByte=0, endByte=None, ShowProgress=True):
	'''
	Function downloads file.
	@param url: File url address.
	@param path: Destination file path.
	@param startByte: Start byte.
	@param endByte: End byte. Will work only if server supports HTTPRange headers.
	@param ShowProgress: If true, shows textual progress bar. 
	
	@return path: Destination file path.
	'''
	url = url.replace(' ', '%20')
	headers = {}
	if endByte is not None:
		headers['Range'] = 'bytes=%d-%d' % (startByte,endByte)
	req = urllib2.Request(url, headers=headers)
	
	try:
		urlObj = urllib2.urlopen(req, timeout=4)
	except urllib2.HTTPError, e:
		if "HTTP Error 416" in str(e):
			# HTTP 416 Error: Requested Range Not Satisfiable. Happens when we ask
			# for a range that is not available on the server. It will happen when
			# the server will try to send us a .html page that means something like
			# "you opened too many connections to our server". If this happens, we
			# will wait for the other threads to finish their connections and try again.
			
			log.warning("Thread didn't got the file it was expecting. Retrying...")
			time.sleep(5)
			return DownloadFile(url, path, startByte, endByte, ShowProgress)
		else:
			raise e
			
	f = open(path, 'wb')
	meta = urlObj.info()
	try:
		filesize = int(meta.getheaders("Content-Length")[0])
	except IndexError:
		log.warning("Server did not send Content-Length.")
		ShowProgress=False
	
	filesize_dl = 0
	block_sz = 8192
	while True:
		try:
			buff = urlObj.read(block_sz)
		except (socket.timeout, socket.error, urllib2.HTTPError), e:
			dummy.shared_bytes_var.value -= filesize_dl
			raise e
			
		if not buff:
			break

		filesize_dl += len(buff)
		try:
			dummy.shared_bytes_var.value += len(buff)
		except AttributeError:
			pass
		f.write(buff)
		
		if ShowProgress:
			status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (filesize_dl / 1024.0 / 1024.0,
					filesize / 1024.0 / 1024.0, progress_bar(1.0*filesize_dl/filesize),
					filesize_dl * 100.0 / filesize)
			status += chr(8)*(len(status)+1)
			print status,
	if ShowProgress:
		print "\n"
		
	f.close()
	return path
	
def DownloadFile_Parall(url, path=None, processes=6,
							minChunkFile=1024**2, nonBlocking=False):
	'''
	Function downloads file parally.
	@param url: File url address.
	@param path: Destination file path.
	@param processes: Number of processes to use in the pool.
	@param minChunkFile: Minimum chunk file in bytes.
	@param nonBlocking: If true, returns (mapObj, pool). A list of file parts will be returned
						from the mapObj results, and the developer must join them himself.
						Developer also must close and join the pool.
	
	@return mapObj: Only if nonBlocking is True. A multiprocessing.pool.AsyncResult object.
	@return pool: Only if nonBlocking is True. A multiprocessing.pool object.
	'''
	from HTTPQuery import Is_ServerSupportHTTPRange
	global shared_bytes_var
	shared_bytes_var.value = 0
	url = url.replace(' ', '%20')
	
	if not path:
		path = get_rand_filename(os.environ['temp'])
		if not os.path.exists(os.path.dirname(path)):
			os.makedirs(os.path.dirname(path))
	log.debug("Downloading to %s..." % path)
	
	urlObj = urllib2.urlopen(url)
	meta = urlObj.info()
	filesize = int(meta.getheaders("Content-Length")[0])
	
	if filesize/processes > minChunkFile and Is_ServerSupportHTTPRange(url):
		args = []
		pos = 0
		chunk = filesize/processes
		for i in range(processes):
			startByte = pos
			endByte = pos + chunk
			if endByte > filesize-1:
				endByte = filesize-1
			args.append([url, path+".%.3d" % i, startByte, endByte, False])
			pos += chunk+1
	else:
		args = [[url, path+".000", None, None, False]]
			
	log.debug("Running %d processes..." % processes)
	pool = multiprocessing.Pool(processes, initializer=_initProcess,initargs=(shared_bytes_var,))
	mapObj = pool.map_async(lambda x: DownloadFile(*x) , args)
	if nonBlocking:
		return mapObj, pool
	while not mapObj.ready():
		status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (shared_bytes_var.value / 1024.0 / 1024.0,
				filesize / 1024.0 / 1024.0, progress_bar(1.0*shared_bytes_var.value/filesize),
				shared_bytes_var.value * 100.0 / filesize)
		status = status + chr(8)*(len(status)+1)
		print status,
		time.sleep(0.1)

	file_parts = mapObj.get()
	pool.terminate()
	pool.join()
	
	combine_files(file_parts, path)

def combine_files(parts, path):
	'''
	Function combines file parts.
	@param parts: List of file paths.
	@param path: Destination path.
	'''
	with open(path,'wb') as output:
		for part in parts:
			with open(part,'rb') as f:
				output.writelines(f.readlines())
			os.remove(part)
			
def progress_bar(progress, length=20):
	'''
	Function creates a textual progress bar.
	@param progress: Float number between 0 and 1 describes the progress.
	@param length: The length of the progress bar in chars. Default is 20.
	'''
	length -= 2 # The bracket are 2 chars long.
	return "[" + "#"*int(progress*length) + "-"*(length-int(progress*length)) + "]"	

def get_rand_filename(dir_=os.getcwd()):
	"Function returns a non-existent random filename."
	return tempfile.mkstemp('.tmp', '', dir_)[1]

def _initProcess(x):
  dummy.shared_bytes_var = x

It was proven that opening multiple sessions to the server may enhance transfer rate dramatically. This class attempts to extend urllib2.urlopen with this concept in mind.

The scripts uses two external classes: dummy is an empty python file, used as a medium for data sharing across threads. logger.log is the default logger of my project. You can comment out the four logger line codes if you don't need them.

4 comments

a 11 years, 9 months ago  # | flag

It was proven that opening multiple sessions to the server may enhance transfer rate dramatically.

I would very much like to see that proof. Please provide source and/or literature.

Itay Brandes (author) 10 years, 3 months ago  # | flag

Now released as a package: https://pypi.python.org/pypi/pySmartDL

Nguyen Van Long 8 years, 5 months ago  # | flag

Thanks for share !