Welcome, guest | Sign In | My Account | Store | Cart
# http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
#
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
#
# MultiprocessRLock acquire()/release() methods patterned, to some extent,
# after threading.RLock acquire()/release() of Python standard library.

"""
Multiprocess-safe logging and interprocess locking classes.

A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler
(logging.FileHandler replacement, designed for logging to a single file from
multiple independent processes) together with a simple interprocess RLock.

The module contains:
* universal abstract classes:
  MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,
* Unix/Linux-only example implementation (with flock-based locking):
  FLockRLock and FLockFileHandler classes.

Tested under Debian GNU/Linux, with Python 2.4, 2.5, 2.6 and 3.1.
"""

import logging
import os
import sys

#
# Unix or non-Unix platform? (supporting or not supporting the fcntl module)
try:
    # Unix/Linux
    import fcntl
    __all__ = (
        # abstract classes:
        'MultiprocessRLock',
        'MultiprocessFileHandler',
        'LockedFileHandler',
        # fcntl.flock()-based implementation:
        'FLockRLock',
        'FLockFileHandler',
    )
except ImportError:
    # non-Unix
    fcntl = None
    __all__ = (
        # abstract classes only:
        'MultiprocessRLock',
        'MultiprocessFileHandler',
        'LockedFileHandler',
    )

#
# Real or dummy threading?
try:
    import threading
except ImportError:
    import dummy_threading as threading

#
# Python 2.x or 3.x?
try:
    # 2.x (including < 2.6)
    try:
        from thread import get_ident as get_thread_ident
    except ImportError:
        from dummy_thread import get_ident as get_thread_ident
except ImportError:
    # 3.x
    def get_thread_ident(get_current_thread=threading.current_thread):
        return get_current_thread().ident



#
# Abstract classes
#

class MultiprocessRLock(object):

    "Interprocess and interthread recursive lock (abstract class)."

    def __init__(self):
        self._threading_lock = threading.Lock()
        self._owner = None
        self._count = 0

    def __repr__(self):
        return '<%s owner=%s count=%d>' % (self.__class__.__name__,
                                           self._owner, self._count)

    def _interprocess_lock_acquire(self, blocking):  # abstract method
        # the implementing function should return:
        # * True on success
        # * False on failure (applies to non-blocking mode)
        raise NotImplementedError

    def _interprocess_lock_release(self):  # abstract method
        raise NotImplementedError

    @staticmethod
    def _get_me(getpid=os.getpid, get_thread_ident=get_thread_ident):
        return '%d:%d' % (getpid(), get_thread_ident())

    def acquire(self, blocking=1):
        me = self._get_me()
        if self._owner == me:
            self._count += 1
            return True
        if not self._threading_lock.acquire(blocking):
            return False
        acquired = False
        try:
            acquired = self._interprocess_lock_acquire(blocking)
        finally:
            if not acquired:
                # important to be placed within the finally-block
                self._threading_lock.release()
            else:
                self._owner = me
                self._count = 1
        return acquired

    __enter__ = acquire

    def release(self):
        if self._owner != self._get_me():
            raise RuntimeError("cannot release un-acquired lock")
        self._count -= 1
        if not self._count:
            self._owner = None
            self._interprocess_lock_release()
            self._threading_lock.release()

    def __exit__(self, *args, **kwargs):
        self.release()



class MultiprocessFileHandler(logging.FileHandler):

    "Multiprocess-safe logging.FileHandler replacement (abstract class)."

    def createLock(self):  # abstract method
        "Create a lock for serializing access to the underlying I/O."
        raise NotImplementedError



class LockedFileHandler(MultiprocessFileHandler):

    "File-locking based logging.FileHandler replacement (abstract class)."

    def __init__(self, filename, mode='a', encoding=None, delay=0):
        "Open the specified file and use it for logging and file locking."
        if delay:
            raise ValueError('cannot initialize LockedFileHandler'
                             ' instance with non-zero delay')
        # base classe's __init__() calls createLock() method before setting
        # self.stream -- so we have to mask that method temporarily:
        self.createLock = lambda: None
        MultiprocessFileHandler.__init__(self, filename, mode, encoding)
        del self.createLock  # now unmask...
        self.createLock()    # ...and call it



if fcntl is not None:

    #
    # Unix/Linux implementation
    #

    class FLockRLock(MultiprocessRLock):

        "flock-based MultiprocessRLock implementation (Unix/Linux only)."

        def __init__(self, lockfile):
            MultiprocessRLock.__init__(self)
            self.lockfile = lockfile

        def _interprocess_lock_acquire(self, blocking,
                                       flock=fcntl.flock,
                                       flags=(fcntl.LOCK_EX | fcntl.LOCK_NB,
                                              fcntl.LOCK_EX),
                                       exc_info=sys.exc_info):
            try:
                flock(self.lockfile, flags[blocking])
            except IOError:
                # Python 2.x & 3.x -compatibile way to get
                # the exception object: call sys.exc_info()
                if exc_info()[1].errno in (11, 13):
                    return False  # <- applies to non-blocking mode only
                raise
            else:
                return True

        def _interprocess_lock_release(self, flock=fcntl.flock,
                                       LOCK_UN=fcntl.LOCK_UN):
            flock(self.lockfile, LOCK_UN)



    class FLockFileHandler(LockedFileHandler):

        "LockedFileHandler implementation using FLockRLock (Unix/Linux only)."

        def createLock(self):
            "Create a lock for serializing access to the underlying I/O."
            self.lock = FLockRLock(self.stream)

Diff to Previous Revision

--- revision 10 2010-09-22 04:32:47
+++ revision 11 2010-09-22 17:30:10
@@ -1,12 +1,9 @@
 # http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
 #
-# Copyright (c) 2010 Jan Kaliszewski (zuo).
-# All Rights Reserved.
-# Licensed under the PSF License.
+# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
 #
 # MultiprocessRLock acquire()/release() methods patterned, to some extent,
 # after threading.RLock acquire()/release() of Python standard library.
-# Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
 
 """
 Multiprocess-safe logging and interprocess locking classes.
@@ -210,205 +207,3 @@
         def createLock(self):
             "Create a lock for serializing access to the underlying I/O."
             self.lock = FLockRLock(self.stream)
-
-
-
-    #
-    # Quick'n'dirty test run
-    #
-
-    def _test(subprocs=3, subthreads=3, logrecords=5000,
-              locktests=500, firstdelete=1, filename='test.log'):
-
-        (subprocs, subthreads, logrecords, firstdelete, locktests
-        ) = map(int,
-                (subprocs, subthreads, logrecords, firstdelete, locktests))
-
-        import re
-        from itertools import islice, takewhile
-        from os.path import abspath
-        from random import randint
-
-        try:
-            from itertools import filterfalse  # Py3.x
-        except ImportError:
-            from itertools import ifilterfalse as filterfalse  # Py2.x
-
-        try: irange = xrange
-        except NameError:  # Py2's xrange() is range() in Py3.x
-            irange = range
-
-        try: inext = next
-        except NameError:
-            inext = lambda i: i.next()  # Py<2.6
-
-        PY_VER = sys.version[:3]
-
-        LOCK_DESCR = 'per-thread', 'thread-shared'
-        POSSIBLE_RESULTS = 'acquired', 'released', 'not acquired'
-        FILLER_AFTER_NOACK = 'so nothing to release :)'
-
-        RECORD_REGEX = re.compile(
-            r'('
-                r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} '  # time
-                r'|'
-                r'-{24}'  # written with stream.write()
-            r')'
-            r'%(py_ver)s'
-            r' proc:\d+ thread:\d+ rec:'
-            r'('
-                r'\d+'  # record counter
-                r'|'
-                r'(%(msg_pattern)s)'
-                r'|'
-                r'%(filler_after_noack)s'
-            r')$'
-            % dict(
-                py_ver=r'[\d\.]{%d}' % len(PY_VER),
-                msg_pattern=r') ('.join((
-                    r'|'.join(map(re.escape, LOCK_DESCR)),
-                    r'<FLockRLock owner=\w+ count=\d+>',
-                    r'|'.join(map(re.escape, POSSIBLE_RESULTS)),
-                )),
-                filler_after_noack=re.escape(FILLER_AFTER_NOACK),
-            )
-        )
-
-
-        def for_subthread(proc_i, thread_i, thread_shared_lock):
-            # FLockFileHandler test
-            logger = logging.getLogger()
-            rec_pattern = ' '.join((PY_VER, 'proc:%d thread:%d rec:%%s'
-                                            % (proc_i, thread_i)))
-            for rec_i in irange(logrecords):
-                logger.info(rec_pattern % rec_i)
-
-            # additional FLockRLock tests
-            per_thread_lockfile = open(abspath(filename), 'a')
-            try:
-                per_thread_lock = FLockRLock(per_thread_lockfile)
-                descr2locks = {'per-thread': per_thread_lock,
-                               'thread-shared': thread_shared_lock}
-                msg_pattern = rec_pattern % '%s %s %s'
-                msg = dict((result,
-                            dict((lock, msg_pattern % (descr, lock, result))
-                                 for descr, lock in descr2locks.items()))
-                           for result in POSSIBLE_RESULTS)
-                msg_acquired = msg['acquired']
-                for lock, m in msg_acquired.items():
-                    # to be written directly to the file -- to avoid deadlock
-                    msg_acquired[lock] = ''.join((24 * '-', m, '\n'))
-                filler_after_noack = rec_pattern % FILLER_AFTER_NOACK
-
-                # let's go
-                locks = list(descr2locks.values())  # Py3's values() is a view
-                for i in irange(locktests):
-                    if randint(0, 1):
-                        get_iter = iter
-                    else:
-                        get_iter = reversed
-                    for lock in get_iter(locks):
-                        if lock.acquire(blocking=randint(0, 1)):
-                            try:
-                                lock.lockfile.write(msg['acquired'][lock])
-                                lock.lockfile.flush()
-                            finally:
-                                lock.release()
-                                logger.info(msg['released'][lock])
-                        else:
-                            logger.info(msg['not acquired'][lock])
-                            logger.info(filler_after_noack)
-            finally:
-                per_thread_lockfile.close()
-
-
-        def for_subprocess(proc_i):
-            # setting up logging to test FLockFileHandler
-            f = logging.Formatter('%(asctime)s %(message)s')
-            h = FLockFileHandler(filename)
-            h.setFormatter(f)
-            logger = logging.getLogger()
-            logger.setLevel(logging.INFO)
-            logger.addHandler(h)
-
-            # (additional standalone FLockRLock instance also to be tested)
-            thread_shared_lockfile = open(abspath(filename), 'a')
-            try:
-                thread_shared_lock = FLockRLock(thread_shared_lockfile)
-
-                # the actual tests
-                threads = [threading.Thread(target=for_subthread,
-                                            args=(proc_i,
-                                                  thread_i,
-                                                  thread_shared_lock))
-                           for thread_i in irange(subthreads)]
-                for t in threads: t.start()
-                for t in threads: t.join()  # wait for subthreads
-            finally:
-                thread_shared_lockfile.close()
-
-
-        def check_records_only():
-            logfile = open(abspath(filename))
-            try:
-                try:
-                    badline = inext(filterfalse(RECORD_REGEX.match, logfile))
-                except StopIteration:
-                    return "OK"
-                else:
-                    sys.exit('Invalid record found: %s' % badline)
-            finally:
-                logfile.close()
-
-
-        def check_records_and_len():
-            (expected_len   # expected number of generated log records
-            ) = subprocs * subthreads * (logrecords + (4 * locktests))
-            logfile = open(abspath(filename))
-            try:
-                # fast + Py2.4-compatibile way to check file content and length
-                file_ending = islice(takewhile(RECORD_REGEX.match, logfile),
-                                     expected_len - 1, expected_len + 1)
-                try:
-                    inext(file_ending)
-                except StopIteration:
-                    sys.exit('Too few valid lines found (%d expected)'
-                             % expected_len)
-                # at this point all the file content should be consumed
-                try:
-                    inext(file_ending)
-                except StopIteration:
-                    return "OK"
-                else:
-                    sys.exit('Too many valid (?) lines found (%d expected)'
-                             % expected_len)
-            finally:
-                logfile.close()
-
-
-        if firstdelete:
-            try:
-                os.remove(abspath(filename))
-            except OSError:
-                pass
-
-        for proc_i in irange(subprocs):
-            if not os.fork():
-                # we are in a subprocess
-                for_subprocess(proc_i)
-                break
-        else:
-            # we are in the parent process
-            for i in irange(subprocs):
-                os.wait()  # wait for subprocesses
-
-            if firstdelete:
-                print(check_records_and_len())
-            else:
-                print(check_records_only())
-
-
-
-    # try running the script simultaneously using different Python versions :)
-    if __name__ == '__main__':
-        _test(*sys.argv[1:])

History