combines queues and threads for efficient data processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
"""tp.py""" import threading, Queue, inspect class Pipe(threading.Thread, Queue.Queue): def __init__(self, input=None): threading.Thread.__init__(self) Queue.Queue.__init__(self) fnargs = inspect.getargspec(self.fn) if input is None and len(fnargs) != 1: raise TypeError, 'no arguments for fn() if pipe is faucet' if input is not None and len(fnargs) != 2: raise TypeError, '1 argument for fn() if pipe has input' if input is not None and not isinstance(input, Queue.Queue): raise TypeError, 'queue not provided as input' self.input = input def fn(self): """ this function gets overridden must return something to fill the queue with """ pass def run(self): """ runs the overridden fn() if fn() returns False, put it on the stack and quit so any other pipes will get it and pass it on, quitting if fn() returns None, don't put anything """ ret = True while ret is not False: if self.input is None: # it's a faucet ret = self.fn() elif hasattr(self.input, 'get'): # it's a pipe ob = self.input.get() if ob is False: self.put(ob) break ret = self.fn(ob) if ret is not None: self.put(ret)
i created this class to make better use of time when dealing with network programming and whatnot. basically, i was processing information, submitting it to be uploaded somewhere, receiving a signal that it was uploaded, then doing something else...all threaded...it was a messy task. tasks like these are now simplified by the threaded pipe.
basically what happens is, each pipe is chained together and the information flows like water. fn() (what does the main processing) is anagolous to the length of the pipe. but the pipe in its entirety could be considered a queue.
the real work is done by overriding fn(). in your own subclass, have fn() take a single variable (an object from the previous pipe) and return an object of any type to be passed onto the next pipe. if the pipe is a beginning pipe (a faucet, or one that produces information instead of simply processing and passing), have fn() take no arguments. note that the number of arguments for fn() must match what kind of pipe you created.
a simple example of pipes in action would be something like this:
orders = OrderQueue(orderDir) processed = OrderProcessor(orders) uploaded = Uploader(processed) finalized = Finalizer(uploaded)
note that OrderQueue, OrderProcessor, Uploader, and Finalizer are all threaded pipes, each with their own overridden fn(). the beauty is that they work in parallel (not TRUE parallel, for python reasons). Uploader could be uploading any orders IN OrderProcessor, while OrderProcessor is processing any orders IN OrderQueue, etc. it makes maximum use of time.
to kill a chain of threaded pipes, simply have the faucet's fn() return a False. upon seeing a False, the faucet will send the False along and exit it's thread. any other pipes along the line, seeing a False, will do the same.
also note that an advantage is only really gained if you're doing some network tasks with the pipes (or if python ever allows for true multiprocessing).
modifications, improvements, or comments are always welcome :)
also this is my first submitted recipe, so don't be too harsh. it's a simple module but very helpful, at least to me.