Distributed lock manager provides mutex(es) over network. It is used to synchronize processes running on different machines, e.g. WSGI processes in case of web applications. Lock object is compatible with threading.Lock and can be used as a context manager ("with statement"). It can be easily modified to use UNIX sockets instead of TCP/IP. Communication protocol is text based.
First start server process:
$ chmod +x dlm.py
$ ./dlm.py
Usage:
from dlm import LockClient
client = LockClient('localhost', 27272, 'client_name')
lock = client.mkLock('lock_name')
lock.acquire()
# critical section here...
lock.release()
# using context manager
with lock:
# critical section here...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import select
import socket
import sys
import thread
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
class Lock(object):
def __init__(self, client, name):
self.name = name
self.client = client
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self):
self.client.communicate('acquire %s' % self.name)
def release(self):
self.client.communicate('release %s' % self.name)
class LockClient(object):
buffer_size = 128
def __init__(self, host, port, name):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((host, port))
self.name = name
def mkLock(self, name):
self.communicate('mklock %s' % name)
return Lock(self, name)
def mkRLock(self, name):
self.communicate('mkrlock %s' % name)
return Lock(self, name)
def reset(self):
self.communicate('reset x')
def communicate(self, data):
self._socket.send('%s %s' % (self.name, data))
self._socket.recv(self.buffer_size)
def close(self):
try:
self._socket.close()
except socket.error:
pass
class LockServer(threading.Thread):
buffer_size = 128
locks = {}
def __init__(self, _socket, address):
super(LockServer, self).__init__()
self.protocol = {
'mklock': self.mkLock,
'mkrlock': self.mkRLock,
'acquire': self.acquire,
'release': self.release,
'reset': self.reset
}
self._socket = _socket
self.setDaemon(True)
self.start()
def run(self):
try:
self.run_server()
except (EOFError, ValueError, socket.error), e:
logger.error(e)
try:
self._socket.close()
except socket.error:
pass
def run_server(self):
while True:
data = self._socket.recv(self.buffer_size)
if data in ('', '\n', '\r\n'):
return
try:
who, op, name = data.split()
except ValueError:
raise ValueError('Invalid data')
try:
fn = self.protocol[op]
except KeyError:
raise ValueError('%s: invalid operation "%s"' % (who, op))
fn(who, name)
self._socket.send(data)
def get_lock(self, who, name):
try:
return LockServer.locks[name]
except KeyError:
raise ValueError('%s: unknown lock "%s"' % (who, name))
def acquire(self, who, name):
self.get_lock(who, name).acquire(True)
logger.info('%s acquired %s', who, name)
def release(self, who, name):
try:
self.get_lock(who, name).release()
except thread.error, e:
raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
logger.info('%s released %s', who, name)
def reset(self, who, name):
LockServer.locks = {}
logger.warning('%s reseted all locks', who, name)
def mkLock(self, who, name, lock=threading.Lock):
if name not in LockServer.locks:
LockServer.locks[name] = lock()
logger.info('%s created %s', who, name)
else:
logger.info('%s uses %s', who, name)
def mkRLock(self, who, name):
return self.mklock(who, name, lock=threading.RLock)
def serve_forever():
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
port = int(port)
except (IndexError, ValueError):
host, port = 'localhost', 27272
try:
_socket.bind((host, port))
_socket.listen(5)
logger.info('Listening on %s:%s', host, port )
while True:
inputs = [ _socket ]
inputready, outputready, exceptready = select.select(inputs, [], [])
[ LockServer(*stream.accept()) for stream in inputready ]
finally:
try:
_socket.close()
logger.info('Socket closed')
except socket.error:
pass
if __name__ == '__main__':
serve_forever()
|
I would recommend putting the basicConfig() call in the __name__ == '__main__' suite.
yeah, good idea :)
I personally don't have a use for this (but distributed locks fascinate me for some reason), but I did take the time to download and study the code because I'm still getting up-to-speed on Python idioms (and because I was curious about how easy it'd be to break). Just a little apple-polishing: the code's wonderfully written.
Hi Erik, thanks for comment :-) Was it easy to break?
I noticed a race condition in the code. Maybe GIL will prevent it, what you think?
BTW, would it be easier to implement same protocol using twisted?
Is it possible to lock on different keys? For example,
lock = client.mkLock('key_1')
lock.acquire()
should not cause
lock = client.mkLock('key_2')
lock.acquire()
to wait?
Hi John, acquiring lock A should not interfere with lock B, or have you found a bug?