Welcome, guest | Sign In | My Account | Store | Cart

A simple lock-queue (FIFO) context manager implemented with Memcached.

In essence this is a normal lock, where the requests to acquire the lock are granted in the order in which they were originally made. Note that requests to acquire the lock are always blocking.

Python, 34 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time

class memcached_queue_context():
    "A context manager for queuing operations per key via memcached"

    poll_interval = 0.005 # in seconds

    def __init__(self, memcache_client, key_base):
        self.mc = memcache_client
        self.key_base = key_base

        self.queue_push_key = self.key_base + "-push"
        self.queue_wait_key = self.key_base + "-wait"

    def __enter__(self):
        # initialize the queues if needed
        self.mc.add(self.queue_push_key, '1')
        self.mc.add(self.queue_wait_key, '1')

        # take a number
        index = self.mc.incr(self.queue_push_key) - 1

        # poll the queue until your number comes up
        while True:
            idx = int(self.mc.get(self.queue_wait_key)) # int() is critical!!!
            if not idx < index:
                break
            time.sleep(self.poll_interval)
        return

    def __exit__(self, exc_type, exc_val, exc_tb):
        # advance the queue
        self.mc.incr(self.queue_wait_key)
        return False

Since this implementation uses Memcached, you will need to have at least one memcached process running (either locally or on a separate host).

This is implemented as a context manager to ensure (as far as possible) that the code holding the lock-queue will release it when it's done. If you need to be sure that the lock-queue doesn't become stuck forever, e.g. because a process died while holding the lock, have a separate process/thread monitor the queue directly via memcached and release it if it becomes stuck. This is left as an exercise for the reader ;)

1 comment

Sridhar Ratnakumar 14 years, 9 months ago  # | flag
idx = int(self.mc.get(self.queue_wait_key)) # int() is critical!!!
  

I got - TypeError: int() argument must be a string or a number, not 'NoneType'