Distributed lock manager provides mutex(es) over network. It is used to synchronize processes running on different machines, e.g. WSGI processes in case of web applications. Lock object is compatible with threading.Lock and can be used as a context manager ("with statement"). It can be easily modified to use UNIX sockets instead of TCP/IP. Communication protocol is text based.
First start server process:
$ chmod +x dlm.py
$ ./dlm.py
Usage:
from dlm import LockClient
client = LockClient('localhost', 27272, 'client_name')
lock = client.mkLock('lock_name')
lock.acquire()
# critical section here...
lock.release()
# using context manager
with lock:
# critical section here...
| #!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import select
import socket
import sys
import thread
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
class Lock(object):
def __init__(self, client, name):
self.name = name
self.client = client
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self):
self.client.communicate('acquire %s' % self.name)
def release(self):
self.client.communicate('release %s' % self.name)
class LockClient(object):
buffer_size = 128
def __init__(self, host, port, name):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((host, port))
self.name = name
def mkLock(self, name):
self.communicate('mklock %s' % name)
return Lock(self, name)
def mkRLock(self, name):
self.communicate('mkrlock %s' % name)
return Lock(self, name)
def reset(self):
self.communicate('reset x')
def communicate(self, data):
self._socket.send('%s %s' % (self.name, data))
self._socket.recv(self.buffer_size)
def close(self):
try:
self._socket.close()
except socket.error:
pass
class LockServer(threading.Thread):
buffer_size = 128
locks = {}
def __init__(self, _socket, address):
super(LockServer, self).__init__()
self.protocol = {
'mklock': self.mkLock,
'mkrlock': self.mkRLock,
'acquire': self.acquire,
'release': self.release,
'reset': self.reset
}
self._socket = _socket
self.setDaemon(True)
self.start()
def run(self):
try:
self.run_server()
except (EOFError, ValueError, socket.error), e:
logger.error(e)
try:
self._socket.close()
except socket.error:
pass
def run_server(self):
while True:
data = self._socket.recv(self.buffer_size)
if data in ('', '\n', '\r\n'):
return
try:
who, op, name = data.split()
except ValueError:
raise ValueError('Invalid data')
try:
fn = self.protocol[op]
except KeyError:
raise ValueError('%s: invalid operation "%s"' % (who, op))
fn(who, name)
self._socket.send(data)
def get_lock(self, who, name):
try:
return LockServer.locks[name]
except KeyError:
raise ValueError('%s: unknown lock "%s"' % (who, name))
def acquire(self, who, name):
self.get_lock(who, name).acquire(True)
logger.info('%s acquired %s', who, name)
def release(self, who, name):
try:
self.get_lock(who, name).release()
except thread.error, e:
raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
logger.info('%s released %s', who, name)
def reset(self, who, name):
LockServer.locks = {}
logger.warning('%s reseted all locks', who, name)
def mkLock(self, who, name, lock=threading.Lock):
if name not in LockServer.locks:
LockServer.locks[name] = lock()
logger.info('%s created %s', who, name)
else:
logger.info('%s uses %s', who, name)
def mkRLock(self, who, name):
return self.mklock(who, name, lock=threading.RLock)
def serve_forever():
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
port = int(port)
except (IndexError, ValueError):
host, port = 'localhost', 27272
try:
_socket.bind((host, port))
_socket.listen(5)
logger.info('Listening on %s:%s', host, port )
while True:
inputs = [ _socket ]
inputready, outputready, exceptready = select.select(inputs, [], [])
[ LockServer(*stream.accept()) for stream in inputready ]
finally:
try:
_socket.close()
logger.info('Socket closed')
except socket.error:
pass
if __name__ == '__main__':
serve_forever()
|
I would recommend putting the basicConfig() call in the __name__ == '__main__' suite.
yeah, good idea :)
I personally don't have a use for this (but distributed locks fascinate me for some reason), but I did take the time to download and study the code because I'm still getting up-to-speed on Python idioms (and because I was curious about how easy it'd be to break). Just a little apple-polishing: the code's wonderfully written.
Hi Erik, thanks for comment :-) Was it easy to break?
I noticed a race condition in the code. Maybe GIL will prevent it, what you think?
BTW, would it be easier to implement same protocol using twisted?
Is it possible to lock on different keys? For example,
lock = client.mkLock('key_1')
lock.acquire()
should not cause
lock = client.mkLock('key_2')
lock.acquire()
to wait?
Hi John, acquiring lock A should not interfere with lock B, or have you found a bug?