Welcome, guest | Sign In | My Account | Store | Cart

Distributed lock manager provides mutex(es) over network. It is used to synchronize processes running on different machines, e.g. WSGI processes in case of web applications. Lock object is compatible with threading.Lock and can be used as a context manager ("with statement"). It can be easily modified to use UNIX sockets instead of TCP/IP. Communication protocol is text based.

First start server process:

$ chmod +x dlm.py
$ ./dlm.py

Usage:

from dlm import LockClient

client = LockClient('localhost', 27272, 'client_name')

lock = client.mkLock('lock_name')

lock.acquire()
# critical section here...
lock.release()

# using context manager
with lock:
    # critical section here...
Python, 179 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python    
# -*- coding: utf-8 -*-
                   
import logging            
import os            
import select            
import socket            
import sys      
import thread
import threading

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

class Lock(object):
    def __init__(self, client, name):
        self.name = name
        self.client = client

    def __enter__(self):
        self.acquire()
        return self
  
    def __exit__(self, exc_type, exc_value, traceback):
        self.release()
 
    def acquire(self):
        self.client.communicate('acquire %s' % self.name)
    
    def release(self):
        self.client.communicate('release %s' % self.name)        
        

class LockClient(object):
    buffer_size = 128
    
    def __init__(self, host, port, name):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self.name = name
   
    def mkLock(self, name):
        self.communicate('mklock %s' %  name)
        return Lock(self, name)

    def mkRLock(self, name):
        self.communicate('mkrlock %s' % name)
        return Lock(self, name)
  
    def reset(self):
        self.communicate('reset x')  

    def communicate(self, data):
        self._socket.send('%s %s' % (self.name, data))
        self._socket.recv(self.buffer_size)
      
    def close(self):
        try:
            self._socket.close()
        except socket.error:
            pass


class LockServer(threading.Thread):
    buffer_size = 128
    locks = {}
    
    def __init__(self, _socket, address):
        super(LockServer, self).__init__()
        
        self.protocol = {
            'mklock': self.mkLock,
            'mkrlock': self.mkRLock,
            'acquire': self.acquire,
            'release': self.release,
            'reset': self.reset
        }
        
        self._socket = _socket
        self.setDaemon(True)
        self.start()

    def run(self):
        try:
            self.run_server()
        except (EOFError, ValueError, socket.error), e:
            logger.error(e)

        try:
            self._socket.close()
        except socket.error:
            pass

    def run_server(self):
        while True:
            data = self._socket.recv(self.buffer_size)
            
            if data in ('', '\n', '\r\n'):
                return
            
            try:
                who, op, name = data.split()
            except ValueError:
                raise ValueError('Invalid data')
            
            try:
                fn = self.protocol[op]
            except KeyError:
                raise ValueError('%s: invalid operation "%s"' % (who, op))
            
            fn(who, name)
            
            self._socket.send(data)

    def get_lock(self, who, name):
        try:
            return LockServer.locks[name]
        except KeyError:
            raise ValueError('%s: unknown lock "%s"' % (who, name))
        
    def acquire(self, who, name):
        self.get_lock(who, name).acquire(True)
        
        logger.info('%s acquired %s', who, name)
   
    def release(self, who, name):
        try:
            self.get_lock(who, name).release()
        except thread.error, e:
            raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
        
        logger.info('%s released %s', who, name)

    def reset(self, who, name):
        LockServer.locks = {}
        
        logger.warning('%s reseted all locks', who, name)

    def mkLock(self, who, name, lock=threading.Lock):
        if name not in LockServer.locks:
            LockServer.locks[name] = lock()
            
            logger.info('%s created %s', who, name)
        else:
            logger.info('%s uses %s', who, name)
            
    def mkRLock(self, who, name):
        return self.mklock(who, name, lock=threading.RLock)


def serve_forever():
    _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
        port = int(port)
    except (IndexError, ValueError):
        host, port = 'localhost', 27272
    
    try:
        _socket.bind((host, port))
        _socket.listen(5)

        logger.info('Listening on %s:%s', host, port )

        while True:
            inputs =  [ _socket ]
            inputready, outputready, exceptready = select.select(inputs, [], [])

            [ LockServer(*stream.accept()) for stream in inputready ]
    finally:
        try:
            _socket.close()
            logger.info('Socket closed')
        except socket.error:
            pass

if __name__ == '__main__':
    serve_forever()

6 comments

Vinay Sajip 11 years, 9 months ago  # | flag

I would recommend putting the basicConfig() call in the __name__ == '__main__' suite.

pavel (author) 11 years, 9 months ago  # | flag

yeah, good idea :)

Erik Knowles 11 years, 9 months ago  # | flag

I personally don't have a use for this (but distributed locks fascinate me for some reason), but I did take the time to download and study the code because I'm still getting up-to-speed on Python idioms (and because I was curious about how easy it'd be to break). Just a little apple-polishing: the code's wonderfully written.

pavel (author) 11 years, 9 months ago  # | flag

Hi Erik, thanks for comment :-) Was it easy to break?

I noticed a race condition in the code. Maybe GIL will prevent it, what you think?

def mkLock(self, who, name, lock=threading.Lock):
    if name not in LockServer.locks:
        LockServer.locks[name] = lock()

BTW, would it be easier to implement same protocol using twisted?

John Sam 9 years, 8 months ago  # | flag

Is it possible to lock on different keys? For example,

lock = client.mkLock('key_1')

lock.acquire()

should not cause

lock = client.mkLock('key_2')

lock.acquire()

to wait?

pavel (author) 9 years, 8 months ago  # | flag

Hi John, acquiring lock A should not interfere with lock B, or have you found a bug?