Welcome, guest | Sign In | My Account | Store | Cart

combines queues and threads for efficient data processing.

Python, 43 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""tp.py"""
import threading, Queue, inspect

class Pipe(threading.Thread, Queue.Queue):
        def __init__(self, input=None):
                threading.Thread.__init__(self)
                Queue.Queue.__init__(self)

                fnargs = inspect.getargspec(self.fn)[0]
                if input is None and len(fnargs) != 1:
                        raise TypeError, 'no arguments for fn() if pipe is faucet'
                if input is not None and len(fnargs) != 2:
                        raise TypeError, '1 argument for fn() if pipe has input'
                if input is not None and not isinstance(input, Queue.Queue):
                        raise TypeError, 'queue not provided as input'

                self.input = input

        def fn(self):
                """
                this function gets overridden
                must return something to fill the queue with
                """
                pass

        def run(self):
                """
                runs the overridden fn()
                if fn() returns False, put it on the stack and quit
                so any other pipes will get it and pass it on, quitting
                if fn() returns None, don't put anything
                """
                ret = True 
                while ret is not False:
                        if self.input is None: # it's a faucet
                                ret = self.fn()
                        elif hasattr(self.input, 'get'): # it's a pipe
                                ob = self.input.get()
                                if ob is False:
                                        self.put(ob)
                                        break
                                ret = self.fn(ob)
                        if ret is not None: self.put(ret)

i created this class to make better use of time when dealing with network programming and whatnot. basically, i was processing information, submitting it to be uploaded somewhere, receiving a signal that it was uploaded, then doing something else...all threaded...it was a messy task. tasks like these are now simplified by the threaded pipe.

basically what happens is, each pipe is chained together and the information flows like water. fn() (what does the main processing) is anagolous to the length of the pipe. but the pipe in its entirety could be considered a queue.

the real work is done by overriding fn(). in your own subclass, have fn() take a single variable (an object from the previous pipe) and return an object of any type to be passed onto the next pipe. if the pipe is a beginning pipe (a faucet, or one that produces information instead of simply processing and passing), have fn() take no arguments. note that the number of arguments for fn() must match what kind of pipe you created.

a simple example of pipes in action would be something like this:

orders = OrderQueue(orderDir) processed = OrderProcessor(orders) uploaded = Uploader(processed) finalized = Finalizer(uploaded)

note that OrderQueue, OrderProcessor, Uploader, and Finalizer are all threaded pipes, each with their own overridden fn(). the beauty is that they work in parallel (not TRUE parallel, for python reasons). Uploader could be uploading any orders IN OrderProcessor, while OrderProcessor is processing any orders IN OrderQueue, etc. it makes maximum use of time.

to kill a chain of threaded pipes, simply have the faucet's fn() return a False. upon seeing a False, the faucet will send the False along and exit it's thread. any other pipes along the line, seeing a False, will do the same.

also note that an advantage is only really gained if you're doing some network tasks with the pipes (or if python ever allows for true multiprocessing).

modifications, improvements, or comments are always welcome :)

also this is my first submitted recipe, so don't be too harsh. it's a simple module but very helpful, at least to me.

2 comments

Ori Peleg 15 years, 3 months ago  # | flag

Nice approach! You may also be interested in Twisted: http://twistedmatrix.com

Twisted's "Deferred" allows you to do that, and more.

Andrew Moffat (author) 15 years, 3 months ago  # | flag

thanks! i'll look into it