Welcome, guest | Sign In | My Account | Store | Cart
"""POSIX (FreeBSD) semaphore bindings."""
from __future__ import with_statement

from ctypes import Structure, POINTER, byref
from ctypes import c_char, c_char_p, c_int, c_size_t, c_uint, c_int32, c_uint32
c_int_p = POINTER(c_int)

EINTR = 4         # Interrupted system call
EEXIST = 17       # Resource temporarily unavailable
EAGAIN = 35       # File exists
ETIMEDOUT = 60    # Operation timed out

def get_func(libname, funcname, restype=None, argtypes=()):
    """Retrieve a function from a library, and set the data types."""
    from ctypes import cdll

    lib = cdll.LoadLibrary(libname)
    func = getattr(lib, funcname)
    func.argtypes = argtypes
    func.restype = restype

    return func

class SemError(RuntimeError):
    """Exception for errors raised by the Sem class."""
    _error = get_func("libc.so", "__error", POINTER(c_int), ())
    _strerror = get_func("libc.so", "strerror", c_char_p, (c_int,))

    def __init__(self):
        """Create an exception based on the value of errno"""
        self.errno = self._error().contents.value
        RuntimeError.__init__(self, self.errno, self._strerror(self.errno))

class c_struct__usem(Structure):
    """FreeBSD provate semaphore structure."""
    _fields_ = (("_has_waiters", c_uint32),
                ("_count", c_uint32),
                ("_flags", c_uint32))

class c_sem(Structure):
    """Semaphore structure."""
    _fields_ = (("_magic", c_uint32),
                ("_kern", c_struct__usem))
c_sem_p = POINTER(c_sem)

class c_struct_timespec(Structure):
    """timespec function."""
    _fields_ = (("tv_sec", c_int32),
                ("tv_nsec", c_int32))
c_struct_timespec_p = POINTER(c_struct_timespec)

class Sem(object):
    """A POSIC semaphore."""
    O_CREAT = 0x0200  # create if nonexistent
    O_EXCL  = 0x0800  # error if already exists

    _clock_gettime = get_func("libc.so", "clock_gettime", None, (c_int32, c_struct_timespec_p))

    _error = get_func("libc.so", "__error", POINTER(c_int), ())

    _sem_close = get_func("libc.so", "sem_close", c_int, (c_sem_p,))
    _sem_destroy = get_func("libc.so", "sem_destroy", c_int, (c_sem_p,))
    _sem_getvalue = get_func("libc.so", "sem_getvalue", c_int, (c_sem_p, c_int_p))
    _sem_init = get_func("libc.so", "sem_init", c_int, (c_sem_p, c_int, c_uint))
    _sem_open = get_func("libc.so", "sem_open", c_sem_p, (c_char_p, c_int))
    _sem_post = get_func("libc.so", "sem_post", c_int, (c_sem_p,))
    _sem_timedwait = get_func("libc.so", "sem_timedwait", c_int, (c_sem_p, c_struct_timespec_p))
    _sem_trywait = get_func("libc.so", "sem_trywait", c_int, (c_sem_p,))
    _sem_unlink = get_func("libc.so", "sem_unlink", c_int, (c_char_p,))
    _sem_wait = get_func("libc.so", "sem_wait", c_int, (c_sem_p,))

    def __init__(self, value=1, name=None, oflags=0x0200):
        """Create a semaphore with initial value, possible name and flags."""
        if name:
            self.sem = None
            self.sem_p = self._sem_open("/%s" % name, oflags)
            if not self.sem_p:
                raise SemError()
        else:
            self.sem = c_sem()
            self.sem_p = c_sem_p(self.sem)
            if self._sem_init(self.sem_p, 0, value):
                raise SemError()

    def __del__(self):
        """Cleanup the semaphore."""
        if self.sem is None:
            res = self._sem_close(self.sem_p)
        else:
            res = self._sem_destroy(self.sem_p)
        if res:
            raise SemError()

    def __len__(self):
        """The current value of the semaphore."""
        return self.getvalue()

    def __enter__(self):
        """Decrement the value of the semaphore, waiting if required."""
        self.wait()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Increment the value of the semaphore."""
        self.post()

    def acquire(self, blocking=True):
        """Decrement the value of the semaphore, block if requested and
        required."""
        if blocking:
            self.wait()
            return True
        else:
            return self.trywait()

    def release(self):
        """Increment the value of the semaphore."""
        self.post()

    def getvalue(self):
        """The current value of the semaphore."""
        from ctypes import c_int, byref

        sval = c_int()
        if self._sem_getvalue(self.sem_p, byref(sval)):
            raise SemError()
        return sval.value

    def post(self):
        """Increment the value of the semaphore."""
        if self._sem_post(self.sem_p):
            raise SemError()

    def timedwait(self, timeout=0, no_eintr=True):
        """Decrement the value of the semaphore, waiting up to timeout if
        required."""
        timespec = c_struct_timespec()
        self._clock_gettime(0, byref(timespec))
        sec = int(timeout)
        nsec = int((timeout - sec) * 1000000000)
        timespec.tv_sec += sec
        timespec.tv_nsec += nsec
        if timespec.tv_nsec >= 1000000000:
            timespec.tv_sec += 1
            timespec.tv_nsec -= 1000000000
        while True:
            if self._sem_timedwait(self.sem_p, byref(timespec)):
                if self._error().contents.value == ETIMEDOUT:
                    return False
                elif not no_eintr or self._error().contents.value != EINTR:
                    raise SemError()
            else:
                return True

    def trywait(self):
        """Decrement the value of the semaphore if possible."""
        if self._sem_trywait(self.sem_p):
            if self._error().contents.value == EAGAIN:
                return False
            else:
                raise SemError()
        return True

    @staticmethod
    def unlink(name):
        """Remove a named semaphore."""
        if Sem._sem_unlink("/%s" % name):
            raise SemError()

    def wait(self, timeout=None, no_eintr=True):
        """Increment the value of the semaphore, wait up to timeout or wait
        indefinitly."""
        if timeout:
            return self.timedwait(timeout, no_eintr)
        while True:
            if self._sem_wait(self.sem_p):
                if not no_eintr or self._error().contents.value != EINTR:
                    raise SemError()
            else:
                return

History