Imagine you have one wire and you want to send a message from a machine A to another machine B, and wait for an answer from B (a "request/response transaction" in the following). Now imagine you have many threads on A which can make this kind of transaction "simultaneously", then you have to be cautious, otherwise you might get the wrong responses for your transactions. One solution is to serialize the transactions: the threads on A will have to wait for their turn to make their request/response transaction.
Another solution is to map a thread requesting a transaction on A, to a thread handling it on B. This allows for several transactions to be in flight at the same time on the same wire. This is what this module does: on A lives a multiplexer and threads call its transaction() method to initiate the request/response. On B, process_transaction() of a demultiplexer will get called by worker threads. All this in a multi-threaded way on a single wire. The module takes care of managing the interleaving requests/responses.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 | # Author: David Decotigny, Oct 1, 2008
# @brief Multiplexer for parallel transactions over a single data
# channel. This is like a pipe on which we provide a multithreaded
# request/response messaging system. This system allows multiple
# threads to issue several requests in parallel: they are treated in
# parallel on the receiving side and the responses are sent back to
# their respective requesting thread. The exceptions are correctly
# transferred: the _trace member of the exception object will
# indicate the traceback (as text).
#
# The basic synopsis is:
# - call Mux::transaction(*args, **kwds) from a "sender" thread
# - the transaction is sent to the Demux via the channel
# - DeMux::process_transaction(*args, **kwds) gets called by another thread,
# on the other side of the channel (in general in
# another process/machine)
# - the result/exception of process_transaction() is sent back to the
# sender via the channel
# - Mux::transaction() on the sender returns or raises the exception
# raised by DeMux::process_transaction()
#
# The user code of this module has to override the
# DeMux::process_transaction() method for the whole system to be
# useful.
#
# This module makes the following assumptions on the channel used to
# transmit the requests/responses:
# - the channel is bidirectional: both parties can send and receive data on it
# - the channel can transmit arbitrary serializable python objects
# - the channel consists of 2 endpoints having the same API: one
# endpoint for the Multiplexer, one endpoint for the DeMultiplexer
# - the endpoints of the channel have the following methods:
# fileno(): return a file descriptor suitable for select/poll of data
# ready to be received in non-blocking mode (at least for the
# first byte)
# send(data): send the given python data to the receiving party
# data = recv(): wait for python data from the sending party and return it
# close(): close the endpoint in both send/receive directions
# - send() is multithread-safe
#
# To achieve parallel handling of "simultaneous" requests, the
# demultiplexer handles each request in a separate thread: either the
# threads are created on demand (nworkers = None), or a pool of
# pre-allocated threads is used (nworkers = integer). To manage the
# interleaving of the transactions, each transaction has its own ID,
# the "xid".
import sys, os, threading, Queue, itertools, traceback, select, struct
import cPickle as pickle # Only for SimpleChannelEndpoint
__all__ = ["Mux", "DeMux", "ChannelPair"]
## Magic token to mark the end of job submission by the DeMux
SENTINEL = "QUIT"
def is_sentinel(obj):
"""Predicate True when a DeMux worker thread receives a
"terminate" order from the DeMux"""
return type(obj) is str and obj == SENTINEL
class ReceiverThread(threading.Thread):
"""Generic wrapper class to wait for data from a channel:
handle_message() is called for each data received. Provides a
stop() method to stop receiving the data. This is a thread
object: call start() to start it"""
def __init__(self, channel, *args, **kwds):
"""
\param channel is a Channel endpoint (fileno/recv/close
methods expected)
"""
threading.Thread.__init__(self, *args, **kwds)
self._channel = channel
self.__terms = os.pipe()
self._recv = channel.recv
self._send = channel.send
def run(self):
"""
Wait for either a call to stop() or for a data to be available
on the channel and then call handle_message. And loop over.
"""
# Initialize poll()
fd = self._channel.fileno()
waitset = select.poll()
eventmask = select.POLLIN | select.POLLERR \
| select.POLLHUP | select.POLLPRI
waitset.register(fd, eventmask)
waitset.register(self.__terms[0], eventmask)
while 1:
exit_loop = False
for fd_, evt in waitset.poll():
if fd_ != fd:
# Received sthg on the __terms pipe
exit_loop = True
break
if evt != select.POLLIN:
# Receive something on the channel, but not a normal
# data (probably a HUP)
exit_loop = True
break
if exit_loop:
break
# Error while receiving => term thread
data = self._recv()
# Call handle_message (dump the exceptions, but ignore them)
try:
self.handle_message(data)
except:
traceback.print_exc()
# End while
def handle_message(self, message):
"""Method to override: called each time a message is received"""
raise NotImplementedError("Children classes expected to override it")
def stop(self):
"""Stop receiving data. Waits until the thread is
terminated. DO NOT CALL THIS from inside handle_message()"""
os.write(self.__terms[1], "TERMINATION")
self._channel.close()
self.join()
class Mux(ReceiverThread):
"""Thread that multiplexes calls to the transaction() method on
the given channel"""
def __init__(self, channel):
"""
\param channel is a Channel endpoint (fileno/recv/close
methods expected)
"""
ReceiverThread.__init__(self, channel)
self.__lock = threading.Lock()
self.__waitq = dict()
self.__idgen = itertools.count(42)
def transaction(self, *args, **kwds):
"""Call this method to send the given args on the wire and
wait for a response"""
evt = threading.Event(self.__lock)
# Allocate a transaction ID
self.__lock.acquire()
try:
xid = self.__idgen.next()
assert xid not in self.__waitq
self.__waitq[xid] = [evt, None] # If except: means MUX stopped
except AttributeError:
raise EOFError("MUX has been stopped.")
finally:
self.__lock.release()
# Send the request
self._send((xid, args, kwds))
# Wait for the answer
evt.wait()
# Return the answer/raise the exception to the caller
self.__lock.acquire()
try:
# Retrieve the result
try:
result = self.__waitq[xid][1]
except (AttributeError, IndexError):
raise EOFError("MUX has been stopped.")
except:
print "EX", self.__waitq
# Work done
del self.__waitq[xid]
# Reformat the result
xid_, result_ = result
assert xid_ == xid, \
"Expected txn id %s != received (%s)" % (xid, xid_)
status, details = result_
if status == "OK":
return details
elif status == "EXCEPTION":
raise details
else:
raise RuntimeError("Invalid status %s !" % repr(status))
return result
finally:
self.__lock.release()
def run(self):
"""Listen to the messages coming from the endpoint and
dispatch them to the threads which sent them"""
try:
ReceiverThread.run(self)
except:
traceback.print_exc()
# If we're here, it means that a stop has been requested:
# unblock _all_ the waiting caller threads and force them
# to fail in transaction()
self.__lock.acquire()
try:
for xid, slot in self.__waitq.iteritems():
del slot[1] # Force IndexError on the waiting threads
slot[0].set()
del self.__waitq # Force AttributeError on next transaction()
finally:
self.__lock.release()
def handle_message(self, msg):
"""Needed by the ReceiverThread object: dispatch the messages
to the caller threads"""
xid, result = msg
self.__lock.acquire()
try:
slot = self.__waitq[xid]
slot[1] = msg
slot[0].set() # wake up the caller thread
finally:
self.__lock.release()
class DeMux(ReceiverThread):
"""Thread that demultiplexes transactions coming from a
multiplexer, and calls process_transaction() for each of them. The
transactions are processed in parallel in different worker
threads. The worker threads are either consisting in a pool of
threads (when nworkers is not None), or are created on-demand when
requests arrive (when nworkers is None)"""
__lock = None # Lock object
__workq = None # Queue object or None (in on-demand mode)
__nworkers = None # Specified size of the pool of threads
__workers = None # Either a list of threads (pool) or a dict xid->thread
# (in on-demand mode)
def __init__(self, channel, nworkers = None):
"""
\param channel is a Channel endpoint (fileno/recv/close
methods expected)
\param nworkers (integer) number of threads in the pool able
to process the transaction requests, or None when threads have
to be created on demand
"""
ReceiverThread.__init__(self, channel)
self.__nworkers = nworkers
self.__lock = threading.Lock()
if nworkers is not None:
self.__workers = []
self.__workq = Queue.Queue()
for idworker in range(nworkers):
thr = threading.Thread(target=self._pool_work)
self.__workers.append(thr)
thr.start()
else:
self.__workers = dict()
def handle_message(self, msg):
"""Required by ReceiverThread"""
xid, args, kwds = msg
if self.__nworkers is not None:
# In pool mode: send the job to the pool
self.__workq.put((xid, args, kwds))
else:
# In on-demand mode: spawn a new thread to do the job
thr = threading.Thread(target=self._do_process_transaction,
args=(xid,)+args, kwargs=kwds)
# Register the thread for this transaction
self.__lock.acquire()
try:
self.__workers[xid] = thr
finally:
self.__lock.release()
try:
thr.start()
except:
# Oops, cannot start worker...
self.__lock.acquire()
try:
del self.__workers[xid]
finally:
self.__lock.release()
# Sending exception back to sender
ex = sys.exc_info()[1]
if ex is not None:
ex._trace = traceback.format_exc()
else:
ex = sys.exc_info()[0]
self._send((xid, ("EXCEPTION", ex)))
def _pool_work(self):
"""Method run by the pool worker threads in pool mode"""
while 1:
# Simply consume the jobs from the queue until we get the
# sentinel token
data = self.__workq.get()
if is_sentinel(data):
break
xid, args, kwds = data
# Will raise exception ONLY when connection problems:
self._do_process_transaction(xid, *args, **kwds)
def _do_process_transaction(self, xid, *args, **kwds):
"""Method run by the worker threads to process one transaction"""
# Call process_transaction and prepare the result to send
result = None
try:
result = ("OK", self.process_transaction(*args, **kwds))
except Exception, ex:
ex._trace = traceback.format_exc
result = ("EXCEPTION", ex)
except:
ex = sys.exc_info()[1]
if ex is not None:
ex._trace = traceback.format_exc()
else:
ex = sys.exc_info()[0]
result = ("EXCEPTION", ex)
finally:
if result is None:
ex = RuntimeError("Unexpected error !")
result = ("EXCEPTION", ex)
# Send response
self._send((xid, result))
# Unregister the thread in on-demand mode
if self.__nworkers is None:
self.__lock.acquire()
try:
# In on-demand mode: unregister the thread for this transaction
del self.__workers[xid]
finally:
self.__lock.release()
def process_transaction(self, *args, **kwds):
"""Implement this method in order to generate a response from
the given transaction arguments"""
raise NotImplementedError("Children must implement this method")
def stop(self):
"""Stop the worker threads and close the channel"""
ReceiverThread.stop(self)
#
# No lock because the listening thread is dead already (no new
# thread)
#
# Clearing job queue
if self.__workq is not None:
while 1:
try:
self.__workq.get_nowait()
except Queue.Empty:
break
# Stopping workers
if self.__nworkers is not None:
for i in range(self.__nworkers):
self.__workq.put(SENTINEL)
for thr in self.__workers:
thr.join()
else:
while self.__workers:
xid, thr = self.__workers.popitem()
thr.join()
class SimpleChannelEndpoint:
"""Construct a channel compliant with the channel specifications
from a pair of r/w file descriptors"""
SZI = struct.calcsize('I')
def __init__(self, fd_r, fd_w):
"""
\param r,w The read-write file descriptors used for this endpoint
"""
self._fd_r = fd_r
self._fd_w = fd_w
self._wlock = threading.Lock() # send() has to be thread-safe
def fileno(self):
"""Return a file descriptor suitable for select/poll of data
ready to be received in non-blocking mode (at least for the
first byte)"""
return self._fd_r
def send(self, data):
"""send the given python data to the receiving party"""
sdata = pickle.dumps(data)
sdata = struct.pack('I', len(sdata)) + sdata
self._wlock.acquire()
try:
os.write(self._fd_w, sdata)
finally:
self._wlock.release()
def recv(self):
"""wait for python data from the sending party and return it"""
(expected,) = struct.unpack('I', os.read(self._fd_r, self.SZI))
sdata = ""
while 1:
sdata += os.read(self._fd_r, expected - len(sdata))
assert len(sdata) <= expected
if len(sdata) == expected:
break
return pickle.loads(sdata)
def close(self):
"""close the endpoint in both send/receive directions"""
self._wlock.acquire()
try:
os.close(self._fd_w)
finally:
self._wlock.release()
os.close(self._fd_r)
def ChannelPair():
"""Very simple function returning a connected pair of channels"""
r1, w2 = os.pipe()
r2, w1 = os.pipe()
return ( SimpleChannelEndpoint(r1, w1), SimpleChannelEndpoint(r2, w2) )
def _test():
"""
Some tests
"""
import time, thread
c1, c2 = ChannelPair()
mux = Mux(c1)
class MyDeMux(DeMux):
"""A demultiplexer in which each transaction is a call to sleep()"""
def process_transaction(self, message_before, duration, message_after):
"""One trasaction is just a call to sleep"""
print "[%d] BEGIN: %s (sleep %fs)" % (thread.get_ident(),
message_before, duration)
time.sleep(duration)
print "[%d] END: %s" % (thread.get_ident(), message_after)
class Submitter(threading.Thread):
"""A thread that submits 3 transactions to the mux object"""
def run(self):
"""Submit 3 transactions and stop"""
mux.transaction("msg1", 3, "msg2")
mux.transaction("msg3", 2, "msg4")
mux.transaction("msg5", 1, "msg6")
try:
mux.transaction("msgE", -1, "msgEE")
except IOError, ex:
print "Got expected exception from the DeMux: %s" % repr(ex)
demux = MyDeMux(c2, 100)
# demux = MyDeMux(c2)
# Starting mux/demux
mux.start()
demux.start()
# Starting as many threads that run transactions as possible
children = []
for i in range(700):
thr = Submitter()
try:
thr.start()
children.append(thr)
except:
break
print "Started %d submission threads" % len(children)
# Waiting for the children
for thr in children:
try:
thr.join()
except KeyboardInterrupt:
print "User interruption."
break
# Stopping mux/demux
mux.stop()
demux.stop()
print "Bye."
if __name__ == "__main__":
_test()
|
This recipe has been tested on Linux with python 2.5.2 and 2.6b3.
It defines 2 classes: Mux and DeMux. Instances of Mux present a "transaction()" method that, when called, will result in the "process_transaction()" method of a DeMux instance to be called. The result/exception of process_transaction() on DeMux will be sent back to transaction() on the Mux side.
The Mux and DeMux are connected to one another via a single bidirectional data link, such as a pair of pipes for example. The assumptions regarding the channel are described at the beginning of the source. This module provides a simple example of such a channel, based on Unix pipes, but this is by no means a reference implementation.
What this module provides is a way for Mux::transaction() to be called simultaneously from different threads: the request/responses will be interleaved and this module is here to manage this. As a result, DeMux::process_transaction() will be called simultaneously by different threads too. The threads on the DeMux side are either created on-the-fly for each request (when its ctor nworkers is None), or taken from a pre-allocated pool of threads. This module has its own implementation of a pool of threads (when nworkers is an integer). The implementation of the pool of threads is ad-hoc, self-contained in that module. The exceptions raised in process_transaction() are correctly transmitted back to the sender.
The only thing that the user needs to do is override the process_transaction() method of DeMux.
It is trivial to use the threadpool API of recipe #576519 to replace the ad-hoc thread pool implementation: use the apply_async() method of the pool.