Welcome, guest | Sign In | My Account | Store | Cart

Bandwidth testing is easy with Python's built-in web access, HTML parsing, and threading modules.

When Fedora Core 2 was released, I wanted to find which download mirror would be fastest when I tried downloading the CD images. It only took about an hour to whip up this mirror bandwidth tester.

This script demonstrates web download, HTML parsing, and some interesting threading issues.

Python, 151 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from HTMLParser import HTMLParser
import urllib
import time
import Queue
import threading
import urlparse

MIRRORS_URL = 'http://fedora.redhat.com/download/mirrors.html'
MAX_THREADS = 50
HTTP_TIMEOUT = 60.0   # Max. seconds to wait for a response

class UrlFinder(HTMLParser):

    '''Subclass of the HTMLParser object.  Records the HREF attributes
    of anchor tags if the scheme is 'http' and the anchor occurs in
    the 'content' section of the page.'''
    
    def __init__(self):
        HTMLParser.__init__(self)
        self.mirrorLinks = []  

        # True if we're currently in the 'content' section
        self.isInMirrors = False
        
    def handle_comment(self, data):

        # The comments have spaces before and after, but don't count
        # on that.
        data = data.strip()

        if 'content BEGIN' == data:
            self.isInMirrors = True
        elif 'content END' == data:
            self.isInMirrors = False

    def handle_starttag(self, tag, attrs):
        if self.isInMirrors:
            attrs = dict(attrs) # Convert from tuple of tuples to dict
            if 'a' == tag and 'http' == urllib.splittype(attrs['href'])[0]:
                self.mirrorLinks.append(attrs['href'])

# Record the start time, so we can print a nice message at the end
processStartTime = time.time()

# Create the parser, get the 'mirrors' page from Redhat,
# and extract the URLs
print "Getting mirrors list...",
parser = UrlFinder()
parser.feed(urllib.urlopen(MIRRORS_URL).read())


print len(parser.mirrorLinks), "mirrors found."
numThreads = min(MAX_THREADS, len(parser.mirrorLinks))
print "Testing bandwidth with", numThreads, "threads..."

# Build a queue to feed the worker threads
workQueue = Queue.Queue()
for url in parser.mirrorLinks:
    workQueue.put(url)

def TestUrl(workQueue, resultQueue):

    ''' Worker thread procedure.  Test how long it takes to return the
    mirror index page, and stuff the results into resultQueue.'''
    
    def SubthreadProc(url, result):

        ''' Subthread procedure.  Actually get the mirror index page
        in a subthread, so that we can time out using join rather than
        wait for a very slow server.  Passing in a list for result
        lets us simulate pass-by-reference, since callers cannot get
        the return code from a Python thread.'''
        
        startTime = time.time()
        try:
            data = urllib.urlopen(url).read()
        except Exception:
            # Could be a socket error or an HTTP error--either way, we
            # don't care--it's a failure to us.
            result.append(-1)
        else:
            elapsed = int((time.time() - startTime) * 1000)
            result.append(elapsed)

            
    while 1:
        # Contine pulling data from the work queue until it's empty
        try:
            url = workQueue.get(0)
        except Queue.Empty:
            # work queue is empty--exit the thread proc.
            return

        # Create a single subthread to do the actual work
        result = []
        subThread = threading.Thread(target=SubthreadProc, args=(url, result))

        # Daemonize the subthread so that even if a few are hanging
        # around when the process is done, the process will exit.
        subThread.setDaemon(True)

        # Run the subthread and wait for it to finish, or time out
        subThread.start()
        subThread.join(HTTP_TIMEOUT)

        if [] == result:
            # Subthread hasn't give a result yet.  Consider it timed out.
            resultQueue.put((url, "TIMEOUT"))
        elif -1 == result[0]:
            # Subthread returned an error from geturl.
            resultQueue.put((url, "FAILED"))
        else:
            # Subthread returned a time.  Store it.
            resultQueue.put((url, result[0]))
        

workers = []
resultQueue = Queue.Queue()

# Create worker threads to load-balance the retrieval
for threadNum in range(0, numThreads):
    workers.append(threading.Thread(target=TestUrl,
                                    args=(workQueue,resultQueue)))
    workers[-1].start()

# Wait for all the workers to finish
for w in workers:
    w.join()

# Separate the successes from failures
timings = []
failures = []
while not resultQueue.empty():
    url, result = resultQueue.get(0)
    if isinstance(result, str):
        failures.append((result, url))
    else:
        timings.append((result, url))

# Sort by increasing time or result string
timings.sort()
failures.sort()

# Print the results
print "\nMirrors (ordered fastest to slowest)"
for result, url in timings:
    print "%7d %s" % (result, url)
for result, url in failures:
    print "%7s %s" % (result, url)

print "\nProcess completed in ", time.time() - processStartTime, " seconds."

The concept of the script is straightforward: read the mirrors page from RedHat's web site, make a list of all the mirrors, test how long it takes to download from each, and present a sorted list of the results.

The first task, reading and parsing the RedHat mirrors list, is handled with the urllib and HTMLParser modules, respectively. I chose HTMLParser over the more comprehensive parser in sgmllib because it's a bit less work to override the default parser for simple tasks. After the parser sees the content comment in the HTML source, it starts recording any tags that have a scheme of 'http'; it stops recording after it sees the end of the content comment. Currently, it happens that there aren't any absolute URLs on the mirror page outside the content block, but I didn't want to rely on that fact.

To test the bandwidth of each mirror site, I simply test how long it takes to download the index page of the mirror. This is not a perfect test, but it gives reasonably good results without depending on knowledge of the site structure.

The bandwidth test demonstrates a few important paradigms when dealing with multithreading, either in Python or other languages:

  1. Let the underlying libraries do as much work as possible.
  2. Isolate your threads from the rest of the program.

The main thread creates a work queue of URLs to be tested and a result queue for retrieving results, then starts a number of threads to do the work and waits for those threads to exit. Because the Queue class is a threadsafe container, Python guarantees that no two threads will ever get the same work unit, and the storing of results by multiple threads will never leave the queue in a bad state.

Initially, each worker thread downloaded the mirror index page directly, but this caused the process to run for long amounts of time (over three minutes) when some sites were heavily loaded. To avoid this, I defined a maximum time to attempt downloading, and made each worker thread spawn a new daemon thread to do the download. The worker thread can use Thread.join() to wait on the subthread with a timeout; timeouts are counted as failures. Note that I pass an empty list to the subthread to collect the results. Threads in Python don't have a convenient way to return a status code back to the caller; by passing a mutable object like a list, the subthread can append value to the list to indicate a result. When the join() on the subthread completes, the worker thread can tell that it timed out if the list it passed in is empty.

The worker threads put the results for each URL into a results queue. For successful tests, they put a tuple of the URL and the time it took to download; for unsuccessful results, they put a tuple of the URL and a string describing the type of failure. When the main thread has detected that all worker threads have exited, it separates successes from failures, sorts the two lists, and prints them in aligned columns.

Note that the script could be written without the second-level threads. Using them helps isolate the failure-prone download from the more reliable worker thread pool, at the cost of a few more ephemeral threads, and provides a good demonstration of how and when to use daemon threads to keep a script from hanging indefinitely at shutdown.

This script is useful to tell which mirrors are most heavily loaded, but it has shortcomings. Some HTTP-based mirrors are actually redirects to FTP mirrors, and some seem to apply different bandwidth throttles to index pages and ISO downloads. Additionally, the script can't tell which of the mirrors actually have up-to-date files; this can't easily be fixed without having knowledge of each mirror site, since mirror sites differ in their directory structure. But this at least gives the would-be upgrader an idea of where to look.