Welcome, guest | Sign In | My Account | Store | Cart

POSIX Semaphore bindings for FreeBSD.

Python, 180 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""POSIX (FreeBSD) semaphore bindings."""
from __future__ import with_statement

from ctypes import Structure, POINTER, byref
from ctypes import c_char, c_char_p, c_int, c_size_t, c_uint, c_int32, c_uint32
c_int_p = POINTER(c_int)

EINTR = 4         # Interrupted system call
EEXIST = 17       # Resource temporarily unavailable
EAGAIN = 35       # File exists
ETIMEDOUT = 60    # Operation timed out

def get_func(libname, funcname, restype=None, argtypes=()):
    """Retrieve a function from a library, and set the data types."""
    from ctypes import cdll

    lib = cdll.LoadLibrary(libname)
    func = getattr(lib, funcname)
    func.argtypes = argtypes
    func.restype = restype

    return func

class SemError(RuntimeError):
    """Exception for errors raised by the Sem class."""
    _error = get_func("libc.so", "__error", POINTER(c_int), ())
    _strerror = get_func("libc.so", "strerror", c_char_p, (c_int,))

    def __init__(self):
        """Create an exception based on the value of errno"""
        self.errno = self._error().contents.value
        RuntimeError.__init__(self, self.errno, self._strerror(self.errno))

class c_struct__usem(Structure):
    """FreeBSD provate semaphore structure."""
    _fields_ = (("_has_waiters", c_uint32),
                ("_count", c_uint32),
                ("_flags", c_uint32))

class c_sem(Structure):
    """Semaphore structure."""
    _fields_ = (("_magic", c_uint32),
                ("_kern", c_struct__usem))
c_sem_p = POINTER(c_sem)

class c_struct_timespec(Structure):
    """timespec function."""
    _fields_ = (("tv_sec", c_int32),
                ("tv_nsec", c_int32))
c_struct_timespec_p = POINTER(c_struct_timespec)

class Sem(object):
    """A POSIC semaphore."""
    O_CREAT = 0x0200  # create if nonexistent
    O_EXCL  = 0x0800  # error if already exists

    _clock_gettime = get_func("libc.so", "clock_gettime", None, (c_int32, c_struct_timespec_p))

    _error = get_func("libc.so", "__error", POINTER(c_int), ())

    _sem_close = get_func("libc.so", "sem_close", c_int, (c_sem_p,))
    _sem_destroy = get_func("libc.so", "sem_destroy", c_int, (c_sem_p,))
    _sem_getvalue = get_func("libc.so", "sem_getvalue", c_int, (c_sem_p, c_int_p))
    _sem_init = get_func("libc.so", "sem_init", c_int, (c_sem_p, c_int, c_uint))
    _sem_open = get_func("libc.so", "sem_open", c_sem_p, (c_char_p, c_int))
    _sem_post = get_func("libc.so", "sem_post", c_int, (c_sem_p,))
    _sem_timedwait = get_func("libc.so", "sem_timedwait", c_int, (c_sem_p, c_struct_timespec_p))
    _sem_trywait = get_func("libc.so", "sem_trywait", c_int, (c_sem_p,))
    _sem_unlink = get_func("libc.so", "sem_unlink", c_int, (c_char_p,))
    _sem_wait = get_func("libc.so", "sem_wait", c_int, (c_sem_p,))

    def __init__(self, value=1, name=None, oflags=0x0200):
        """Create a semaphore with initial value, possible name and flags."""
        if name:
            self.sem = None
            self.sem_p = self._sem_open("/%s" % name, oflags)
            if not self.sem_p:
                raise SemError()
        else:
            self.sem = c_sem()
            self.sem_p = c_sem_p(self.sem)
            if self._sem_init(self.sem_p, 0, value):
                raise SemError()

    def __del__(self):
        """Cleanup the semaphore."""
        if self.sem is None:
            res = self._sem_close(self.sem_p)
        else:
            res = self._sem_destroy(self.sem_p)
        if res:
            raise SemError()

    def __len__(self):
        """The current value of the semaphore."""
        return self.getvalue()

    def __enter__(self):
        """Decrement the value of the semaphore, waiting if required."""
        self.wait()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Increment the value of the semaphore."""
        self.post()

    def acquire(self, blocking=True):
        """Decrement the value of the semaphore, block if requested and
        required."""
        if blocking:
            self.wait()
            return True
        else:
            return self.trywait()

    def release(self):
        """Increment the value of the semaphore."""
        self.post()

    def getvalue(self):
        """The current value of the semaphore."""
        from ctypes import c_int, byref

        sval = c_int()
        if self._sem_getvalue(self.sem_p, byref(sval)):
            raise SemError()
        return sval.value

    def post(self):
        """Increment the value of the semaphore."""
        if self._sem_post(self.sem_p):
            raise SemError()

    def timedwait(self, timeout=0, no_eintr=True):
        """Decrement the value of the semaphore, waiting up to timeout if
        required."""
        timespec = c_struct_timespec()
        self._clock_gettime(0, byref(timespec))
        sec = int(timeout)
        nsec = int((timeout - sec) * 1000000000)
        timespec.tv_sec += sec
        timespec.tv_nsec += nsec
        if timespec.tv_nsec >= 1000000000:
            timespec.tv_sec += 1
            timespec.tv_nsec -= 1000000000
        while True:
            if self._sem_timedwait(self.sem_p, byref(timespec)):
                if self._error().contents.value == ETIMEDOUT:
                    return False
                elif not no_eintr or self._error().contents.value != EINTR:
                    raise SemError()
            else:
                return True

    def trywait(self):
        """Decrement the value of the semaphore if possible."""
        if self._sem_trywait(self.sem_p):
            if self._error().contents.value == EAGAIN:
                return False
            else:
                raise SemError()
        return True

    @staticmethod
    def unlink(name):
        """Remove a named semaphore."""
        if Sem._sem_unlink("/%s" % name):
            raise SemError()

    def wait(self, timeout=None, no_eintr=True):
        """Increment the value of the semaphore, wait up to timeout or wait
        indefinitly."""
        if timeout:
            return self.timedwait(timeout, no_eintr)
        while True:
            if self._sem_wait(self.sem_p):
                if not no_eintr or self._error().contents.value != EINTR:
                    raise SemError()
            else:
                return

POSIX Semaphore's post function is the only thread synchronization function (like mutexs) that may be called from a program interrupt (signal) function.

NOTE: Python semaphores are implemented using a mutex (Lock) and python handles interrupts asynchronously so any code requiring to use sem_post will need to be done directly in C and for that function to be registered with signal(3) in C.