A simple lock-queue (FIFO) context manager implemented with Memcached.
In essence this is a normal lock, where the requests to acquire the lock are granted in the order in which they were originally made. Note that requests to acquire the lock are always blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import time
class memcached_queue_context():
"A context manager for queuing operations per key via memcached"
poll_interval = 0.005 # in seconds
def __init__(self, memcache_client, key_base):
self.mc = memcache_client
self.key_base = key_base
self.queue_push_key = self.key_base + "-push"
self.queue_wait_key = self.key_base + "-wait"
def __enter__(self):
# initialize the queues if needed
self.mc.add(self.queue_push_key, '1')
self.mc.add(self.queue_wait_key, '1')
# take a number
index = self.mc.incr(self.queue_push_key) - 1
# poll the queue until your number comes up
while True:
idx = int(self.mc.get(self.queue_wait_key)) # int() is critical!!!
if not idx < index:
break
time.sleep(self.poll_interval)
return
def __exit__(self, exc_type, exc_val, exc_tb):
# advance the queue
self.mc.incr(self.queue_wait_key)
return False
|
Since this implementation uses Memcached, you will need to have at least one memcached process running (either locally or on a separate host).
This is implemented as a context manager to ensure (as far as possible) that the code holding the lock-queue will release it when it's done. If you need to be sure that the lock-queue doesn't become stuck forever, e.g. because a process died while holding the lock, have a separate process/thread monitor the queue directly via memcached and release it if it becomes stuck. This is left as an exercise for the reader ;)
I got - TypeError: int() argument must be a string or a number, not 'NoneType'