A simple lock-queue (FIFO) context manager implemented with Memcached.
In essence this is a normal lock, where the requests to acquire the lock are granted in the order in which they were originally made. Note that requests to acquire the lock are always blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
import time class memcached_queue_context(): "A context manager for queuing operations per key via memcached" poll_interval = 0.005 # in seconds def __init__(self, memcache_client, key_base): self.mc = memcache_client self.key_base = key_base self.queue_push_key = self.key_base + "-push" self.queue_wait_key = self.key_base + "-wait" def __enter__(self): # initialize the queues if needed self.mc.add(self.queue_push_key, '1') self.mc.add(self.queue_wait_key, '1') # take a number index = self.mc.incr(self.queue_push_key) - 1 # poll the queue until your number comes up while True: idx = int(self.mc.get(self.queue_wait_key)) # int() is critical!!! if not idx < index: break time.sleep(self.poll_interval) return def __exit__(self, exc_type, exc_val, exc_tb): # advance the queue self.mc.incr(self.queue_wait_key) return False
Since this implementation uses Memcached, you will need to have at least one memcached process running (either locally or on a separate host).
This is implemented as a context manager to ensure (as far as possible) that the code holding the lock-queue will release it when it's done. If you need to be sure that the lock-queue doesn't become stuck forever, e.g. because a process died while holding the lock, have a separate process/thread monitor the queue directly via memcached and release it if it becomes stuck. This is left as an exercise for the reader ;)