#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import select
import socket
import sys
import thread
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
class Lock(object):
def __init__(self, client, name):
self.name = name
self.client = client
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self):
self.client.communicate('acquire %s' % self.name)
def release(self):
self.client.communicate('release %s' % self.name)
class LockClient(object):
buffer_size = 128
def __init__(self, host, port, name):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((host, port))
self.name = name
def mkLock(self, name):
self.communicate('mklock %s' % name)
return Lock(self, name)
def mkRLock(self, name):
self.communicate('mkrlock %s' % name)
return Lock(self, name)
def reset(self):
self.communicate('reset x')
def communicate(self, data):
self._socket.send('%s %s' % (self.name, data))
self._socket.recv(self.buffer_size)
def close(self):
try:
self._socket.close()
except socket.error:
pass
class LockServer(threading.Thread):
buffer_size = 128
locks = {}
def __init__(self, _socket, address):
super(LockServer, self).__init__()
self.protocol = {
'mklock': self.mkLock,
'mkrlock': self.mkRLock,
'acquire': self.acquire,
'release': self.release,
'reset': self.reset
}
self._socket = _socket
self.setDaemon(True)
self.start()
def run(self):
try:
self.run_server()
except (EOFError, ValueError, socket.error), e:
logger.error(e)
try:
self._socket.close()
except socket.error:
pass
def run_server(self):
while True:
data = self._socket.recv(self.buffer_size)
if data in ('', '\n', '\r\n'):
return
try:
who, op, name = data.split()
except ValueError:
raise ValueError('Invalid data')
try:
fn = self.protocol[op]
except KeyError:
raise ValueError('%s: invalid operation "%s"' % (who, op))
fn(who, name)
self._socket.send(data)
def get_lock(self, who, name):
try:
return LockServer.locks[name]
except KeyError:
raise ValueError('%s: unknown lock "%s"' % (who, name))
def acquire(self, who, name):
self.get_lock(who, name).acquire(True)
logger.info('%s acquired %s', who, name)
def release(self, who, name):
try:
self.get_lock(who, name).release()
except thread.error, e:
raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
logger.info('%s released %s', who, name)
def reset(self, who, name):
LockServer.locks = {}
logger.warning('%s reseted all locks', who, name)
def mkLock(self, who, name, lock=threading.Lock):
if name not in LockServer.locks:
LockServer.locks[name] = lock()
logger.info('%s created %s', who, name)
else:
logger.info('%s uses %s', who, name)
def mkRLock(self, who, name):
return self.mklock(who, name, lock=threading.RLock)
def serve_forever():
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
port = int(port)
except (IndexError, ValueError):
host, port = 'localhost', 27272
try:
_socket.bind((host, port))
_socket.listen(5)
logger.info('Listening on %s:%s', host, port )
while True:
inputs = [ _socket ]
inputready, outputready, exceptready = select.select(inputs, [], [])
[ LockServer(*stream.accept()) for stream in inputready ]
finally:
try:
_socket.close()
logger.info('Socket closed')
except socket.error:
pass
if __name__ == '__main__':
serve_forever()
Diff to Previous Revision
--- revision 1 2012-07-04 20:56:08
+++ revision 2 2012-07-04 21:03:32
@@ -5,10 +5,9 @@
import os
import select
import socket
-import sys
-import threading
+import sys
import thread
-import time
+import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
@@ -137,16 +136,16 @@
logger.warning('%s reseted all locks', who, name)
- def mkLock(self, who, name, Lock=threading.Lock):
+ def mkLock(self, who, name, lock=threading.Lock):
if name not in LockServer.locks:
- LockServer.locks[name] = Lock()
+ LockServer.locks[name] = lock()
logger.info('%s created %s', who, name)
else:
logger.info('%s uses %s', who, name)
def mkRLock(self, who, name):
- return self.mklock(who, name, Lock=threading.RLock)
+ return self.mklock(who, name, lock=threading.RLock)
def serve_forever():