Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python    
# -*- coding: utf-8 -*-
                   
import logging            
import os            
import select            
import socket            
import sys      
import thread
import threading

logging
.basicConfig(level=logging.INFO)
logger
= logging.getLogger()

class Lock(object):
   
def __init__(self, client, name):
       
self.name = name
       
self.client = client

   
def __enter__(self):
       
self.acquire()
       
return self
 
   
def __exit__(self, exc_type, exc_value, traceback):
       
self.release()
 
   
def acquire(self):
       
self.client.communicate('acquire %s' % self.name)
   
   
def release(self):
       
self.client.communicate('release %s' % self.name)        
       

class LockClient(object):
    buffer_size
= 128
   
   
def __init__(self, host, port, name):
       
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
self._socket.connect((host, port))
       
self.name = name
   
   
def mkLock(self, name):
       
self.communicate('mklock %s' %  name)
       
return Lock(self, name)

   
def mkRLock(self, name):
       
self.communicate('mkrlock %s' % name)
       
return Lock(self, name)
 
   
def reset(self):
       
self.communicate('reset x')  

   
def communicate(self, data):
       
self._socket.send('%s %s' % (self.name, data))
       
self._socket.recv(self.buffer_size)
     
   
def close(self):
       
try:
           
self._socket.close()
       
except socket.error:
           
pass


class LockServer(threading.Thread):
    buffer_size
= 128
    locks
= {}
   
   
def __init__(self, _socket, address):
       
super(LockServer, self).__init__()
       
       
self.protocol = {
           
'mklock': self.mkLock,
           
'mkrlock': self.mkRLock,
           
'acquire': self.acquire,
           
'release': self.release,
           
'reset': self.reset
       
}
       
       
self._socket = _socket
       
self.setDaemon(True)
       
self.start()

   
def run(self):
       
try:
           
self.run_server()
       
except (EOFError, ValueError, socket.error), e:
            logger
.error(e)

       
try:
           
self._socket.close()
       
except socket.error:
           
pass

   
def run_server(self):
       
while True:
            data
= self._socket.recv(self.buffer_size)
           
           
if data in ('', '\n', '\r\n'):
               
return
           
           
try:
                who
, op, name = data.split()
           
except ValueError:
               
raise ValueError('Invalid data')
           
           
try:
                fn
= self.protocol[op]
           
except KeyError:
               
raise ValueError('%s: invalid operation "%s"' % (who, op))
           
            fn
(who, name)
           
           
self._socket.send(data)

   
def get_lock(self, who, name):
       
try:
           
return LockServer.locks[name]
       
except KeyError:
           
raise ValueError('%s: unknown lock "%s"' % (who, name))
       
   
def acquire(self, who, name):
       
self.get_lock(who, name).acquire(True)
       
        logger
.info('%s acquired %s', who, name)
   
   
def release(self, who, name):
       
try:
           
self.get_lock(who, name).release()
       
except thread.error, e:
           
raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
       
        logger
.info('%s released %s', who, name)

   
def reset(self, who, name):
       
LockServer.locks = {}
       
        logger
.warning('%s reseted all locks', who, name)

   
def mkLock(self, who, name, lock=threading.Lock):
       
if name not in LockServer.locks:
           
LockServer.locks[name] = lock()
           
            logger
.info('%s created %s', who, name)
       
else:
            logger
.info('%s uses %s', who, name)
           
   
def mkRLock(self, who, name):
       
return self.mklock(who, name, lock=threading.RLock)


def serve_forever():
    _socket
= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   
   
try:
        host
, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
        port
= int(port)
   
except (IndexError, ValueError):
        host
, port = 'localhost', 27272
   
   
try:
        _socket
.bind((host, port))
        _socket
.listen(5)

        logger
.info('Listening on %s:%s', host, port )

       
while True:
            inputs
=  [ _socket ]
            inputready
, outputready, exceptready = select.select(inputs, [], [])

           
[ LockServer(*stream.accept()) for stream in inputready ]
   
finally:
       
try:
            _socket
.close()
            logger
.info('Socket closed')
       
except socket.error:
           
pass

if __name__ == '__main__':
    serve_forever
()

Diff to Previous Revision

--- revision 1 2012-07-04 20:56:08
+++ revision 2 2012-07-04 21:03:32
@@ -5,10 +5,9 @@
 
import os            
 
import select            
 
import socket            
-import sys            
-import threading            
+import sys      
 
import thread
-import time
+import threading
 
 logging
.basicConfig(level=logging.INFO)
 logger
= logging.getLogger()
@@ -137,16 +136,16 @@
         
         logger
.warning('%s reseted all locks', who, name)
 
-    def mkLock(self, who, name, Lock=threading.Lock):
+    def mkLock(self, who, name, lock=threading.Lock):
         
if name not in LockServer.locks:
-            LockServer.locks[name] = Lock()
+            LockServer.locks[name] = lock()
             
             logger
.info('%s created %s', who, name)
         
else:
             logger
.info('%s uses %s', who, name)
             
     
def mkRLock(self, who, name):
-        return self.mklock(who, name, Lock=threading.RLock)
+        return self.mklock(who, name, lock=threading.RLock)
 
 
 
def serve_forever():

History