"""POSIX (FreeBSD) semaphore bindings.""" from __future__ import with_statement from ctypes import Structure, POINTER, byref from ctypes import c_char, c_char_p, c_int, c_size_t, c_uint, c_int32, c_uint32 c_int_p = POINTER(c_int) EINTR = 4 # Interrupted system call EEXIST = 17 # Resource temporarily unavailable EAGAIN = 35 # File exists ETIMEDOUT = 60 # Operation timed out def get_func(libname, funcname, restype=None, argtypes=()): """Retrieve a function from a library, and set the data types.""" from ctypes import cdll lib = cdll.LoadLibrary(libname) func = getattr(lib, funcname) func.argtypes = argtypes func.restype = restype return func class SemError(RuntimeError): """Exception for errors raised by the Sem class.""" _error = get_func("libc.so", "__error", POINTER(c_int), ()) _strerror = get_func("libc.so", "strerror", c_char_p, (c_int,)) def __init__(self): """Create an exception based on the value of errno""" self.errno = self._error().contents.value RuntimeError.__init__(self, self.errno, self._strerror(self.errno)) class c_struct__usem(Structure): """FreeBSD provate semaphore structure.""" _fields_ = (("_has_waiters", c_uint32), ("_count", c_uint32), ("_flags", c_uint32)) class c_sem(Structure): """Semaphore structure.""" _fields_ = (("_magic", c_uint32), ("_kern", c_struct__usem)) c_sem_p = POINTER(c_sem) class c_struct_timespec(Structure): """timespec function.""" _fields_ = (("tv_sec", c_int32), ("tv_nsec", c_int32)) c_struct_timespec_p = POINTER(c_struct_timespec) class Sem(object): """A POSIC semaphore.""" O_CREAT = 0x0200 # create if nonexistent O_EXCL = 0x0800 # error if already exists _clock_gettime = get_func("libc.so", "clock_gettime", None, (c_int32, c_struct_timespec_p)) _error = get_func("libc.so", "__error", POINTER(c_int), ()) _sem_close = get_func("libc.so", "sem_close", c_int, (c_sem_p,)) _sem_destroy = get_func("libc.so", "sem_destroy", c_int, (c_sem_p,)) _sem_getvalue = get_func("libc.so", "sem_getvalue", c_int, (c_sem_p, c_int_p)) _sem_init = get_func("libc.so", "sem_init", c_int, (c_sem_p, c_int, c_uint)) _sem_open = get_func("libc.so", "sem_open", c_sem_p, (c_char_p, c_int)) _sem_post = get_func("libc.so", "sem_post", c_int, (c_sem_p,)) _sem_timedwait = get_func("libc.so", "sem_timedwait", c_int, (c_sem_p, c_struct_timespec_p)) _sem_trywait = get_func("libc.so", "sem_trywait", c_int, (c_sem_p,)) _sem_unlink = get_func("libc.so", "sem_unlink", c_int, (c_char_p,)) _sem_wait = get_func("libc.so", "sem_wait", c_int, (c_sem_p,)) def __init__(self, value=1, name=None, oflags=0x0200): """Create a semaphore with initial value, possible name and flags.""" if name: self.sem = None self.sem_p = self._sem_open("/%s" % name, oflags) if not self.sem_p: raise SemError() else: self.sem = c_sem() self.sem_p = c_sem_p(self.sem) if self._sem_init(self.sem_p, 0, value): raise SemError() def __del__(self): """Cleanup the semaphore.""" if self.sem is None: res = self._sem_close(self.sem_p) else: res = self._sem_destroy(self.sem_p) if res: raise SemError() def __len__(self): """The current value of the semaphore.""" return self.getvalue() def __enter__(self): """Decrement the value of the semaphore, waiting if required.""" self.wait() return self def __exit__(self, exc_type, exc_value, traceback): """Increment the value of the semaphore.""" self.post() def acquire(self, blocking=True): """Decrement the value of the semaphore, block if requested and required.""" if blocking: self.wait() return True else: return self.trywait() def release(self): """Increment the value of the semaphore.""" self.post() def getvalue(self): """The current value of the semaphore.""" from ctypes import c_int, byref sval = c_int() if self._sem_getvalue(self.sem_p, byref(sval)): raise SemError() return sval.value def post(self): """Increment the value of the semaphore.""" if self._sem_post(self.sem_p): raise SemError() def timedwait(self, timeout=0, no_eintr=True): """Decrement the value of the semaphore, waiting up to timeout if required.""" timespec = c_struct_timespec() self._clock_gettime(0, byref(timespec)) sec = int(timeout) nsec = int((timeout - sec) * 1000000000) timespec.tv_sec += sec timespec.tv_nsec += nsec if timespec.tv_nsec >= 1000000000: timespec.tv_sec += 1 timespec.tv_nsec -= 1000000000 while True: if self._sem_timedwait(self.sem_p, byref(timespec)): if self._error().contents.value == ETIMEDOUT: return False elif not no_eintr or self._error().contents.value != EINTR: raise SemError() else: return True def trywait(self): """Decrement the value of the semaphore if possible.""" if self._sem_trywait(self.sem_p): if self._error().contents.value == EAGAIN: return False else: raise SemError() return True @staticmethod def unlink(name): """Remove a named semaphore.""" if Sem._sem_unlink("/%s" % name): raise SemError() def wait(self, timeout=None, no_eintr=True): """Increment the value of the semaphore, wait up to timeout or wait indefinitly.""" if timeout: return self.timedwait(timeout, no_eintr) while True: if self._sem_wait(self.sem_p): if not no_eintr or self._error().contents.value != EINTR: raise SemError() else: return