# http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
#
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
#
# MultiprocessRLock acquire()/release() methods patterned, to some extent,
# after threading.RLock acquire()/release() of Python standard library.
"""
Multiprocess-safe logging and interprocess locking classes.
A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler
(logging.FileHandler replacement, designed for logging to a single file from
multiple independent processes) together with a simple interprocess RLock.
The module contains:
* universal abstract classes:
MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,
* Unix/Linux-only example implementation (with flock-based locking):
FLockRLock and FLockFileHandler classes.
Tested under Debian GNU/Linux, with Python 2.4, 2.5, 2.6 and 3.1.
"""
import logging
import os
import sys
#
# Unix or non-Unix platform? (supporting or not supporting the fcntl module)
try:
# Unix/Linux
import fcntl
__all__ = (
# abstract classes:
'MultiprocessRLock',
'MultiprocessFileHandler',
'LockedFileHandler',
# fcntl.flock()-based implementation:
'FLockRLock',
'FLockFileHandler',
)
except ImportError:
# non-Unix
fcntl = None
__all__ = (
# abstract classes only:
'MultiprocessRLock',
'MultiprocessFileHandler',
'LockedFileHandler',
)
#
# Real or dummy threading?
try:
import threading
except ImportError:
import dummy_threading as threading
#
# Python 2.x or 3.x?
try:
# 2.x (including < 2.6)
try:
from thread import get_ident as get_thread_ident
except ImportError:
from dummy_thread import get_ident as get_thread_ident
except ImportError:
# 3.x
def get_thread_ident(get_current_thread=threading.current_thread):
return get_current_thread().ident
#
# Abstract classes
#
class MultiprocessRLock(object):
"Interprocess and interthread recursive lock (abstract class)."
def __init__(self):
self._threading_lock = threading.Lock()
self._owner = None
self._count = 0
def __repr__(self):
return '<%s owner=%s count=%d>' % (self.__class__.__name__,
self._owner, self._count)
def _interprocess_lock_acquire(self, blocking): # abstract method
# the implementing function should return:
# * True on success
# * False on failure (applies to non-blocking mode)
raise NotImplementedError
def _interprocess_lock_release(self): # abstract method
raise NotImplementedError
@staticmethod
def _get_me(getpid=os.getpid, get_thread_ident=get_thread_ident):
return '%d:%d' % (getpid(), get_thread_ident())
def acquire(self, blocking=1):
me = self._get_me()
if self._owner == me:
self._count += 1
return True
if not self._threading_lock.acquire(blocking):
return False
acquired = False
try:
acquired = self._interprocess_lock_acquire(blocking)
finally:
if not acquired:
# important to be placed within the finally-block
self._threading_lock.release()
else:
self._owner = me
self._count = 1
return acquired
__enter__ = acquire
def release(self):
if self._owner != self._get_me():
raise RuntimeError("cannot release un-acquired lock")
self._count -= 1
if not self._count:
self._owner = None
self._interprocess_lock_release()
self._threading_lock.release()
def __exit__(self, *args, **kwargs):
self.release()
class MultiprocessFileHandler(logging.FileHandler):
"Multiprocess-safe logging.FileHandler replacement (abstract class)."
def createLock(self): # abstract method
"Create a lock for serializing access to the underlying I/O."
raise NotImplementedError
class LockedFileHandler(MultiprocessFileHandler):
"File-locking based logging.FileHandler replacement (abstract class)."
def __init__(self, filename, mode='a', encoding=None, delay=0):
"Open the specified file and use it for logging and file locking."
if delay:
raise ValueError('cannot initialize LockedFileHandler'
' instance with non-zero delay')
# base classe's __init__() calls createLock() method before setting
# self.stream -- so we have to mask that method temporarily:
self.createLock = lambda: None
MultiprocessFileHandler.__init__(self, filename, mode, encoding)
del self.createLock # now unmask...
self.createLock() # ...and call it
if fcntl is not None:
#
# Unix/Linux implementation
#
class FLockRLock(MultiprocessRLock):
"flock-based MultiprocessRLock implementation (Unix/Linux only)."
def __init__(self, lockfile):
MultiprocessRLock.__init__(self)
self.lockfile = lockfile
def _interprocess_lock_acquire(self, blocking,
flock=fcntl.flock,
flags=(fcntl.LOCK_EX | fcntl.LOCK_NB,
fcntl.LOCK_EX),
exc_info=sys.exc_info):
try:
flock(self.lockfile, flags[blocking])
except IOError:
# Python 2.x & 3.x -compatibile way to get
# the exception object: call sys.exc_info()
if exc_info()[1].errno in (11, 13):
return False # <- applies to non-blocking mode only
raise
else:
return True
def _interprocess_lock_release(self, flock=fcntl.flock,
LOCK_UN=fcntl.LOCK_UN):
flock(self.lockfile, LOCK_UN)
class FLockFileHandler(LockedFileHandler):
"LockedFileHandler implementation using FLockRLock (Unix/Linux only)."
def createLock(self):
"Create a lock for serializing access to the underlying I/O."
self.lock = FLockRLock(self.stream)
Diff to Previous Revision
--- revision 10 2010-09-22 04:32:47
+++ revision 11 2010-09-22 17:30:10
@@ -1,12 +1,9 @@
# http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
#
-# Copyright (c) 2010 Jan Kaliszewski (zuo).
-# All Rights Reserved.
-# Licensed under the PSF License.
+# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
#
# MultiprocessRLock acquire()/release() methods patterned, to some extent,
# after threading.RLock acquire()/release() of Python standard library.
-# Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
"""
Multiprocess-safe logging and interprocess locking classes.
@@ -210,205 +207,3 @@
def createLock(self):
"Create a lock for serializing access to the underlying I/O."
self.lock = FLockRLock(self.stream)
-
-
-
- #
- # Quick'n'dirty test run
- #
-
- def _test(subprocs=3, subthreads=3, logrecords=5000,
- locktests=500, firstdelete=1, filename='test.log'):
-
- (subprocs, subthreads, logrecords, firstdelete, locktests
- ) = map(int,
- (subprocs, subthreads, logrecords, firstdelete, locktests))
-
- import re
- from itertools import islice, takewhile
- from os.path import abspath
- from random import randint
-
- try:
- from itertools import filterfalse # Py3.x
- except ImportError:
- from itertools import ifilterfalse as filterfalse # Py2.x
-
- try: irange = xrange
- except NameError: # Py2's xrange() is range() in Py3.x
- irange = range
-
- try: inext = next
- except NameError:
- inext = lambda i: i.next() # Py<2.6
-
- PY_VER = sys.version[:3]
-
- LOCK_DESCR = 'per-thread', 'thread-shared'
- POSSIBLE_RESULTS = 'acquired', 'released', 'not acquired'
- FILLER_AFTER_NOACK = 'so nothing to release :)'
-
- RECORD_REGEX = re.compile(
- r'('
- r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} ' # time
- r'|'
- r'-{24}' # written with stream.write()
- r')'
- r'%(py_ver)s'
- r' proc:\d+ thread:\d+ rec:'
- r'('
- r'\d+' # record counter
- r'|'
- r'(%(msg_pattern)s)'
- r'|'
- r'%(filler_after_noack)s'
- r')$'
- % dict(
- py_ver=r'[\d\.]{%d}' % len(PY_VER),
- msg_pattern=r') ('.join((
- r'|'.join(map(re.escape, LOCK_DESCR)),
- r'<FLockRLock owner=\w+ count=\d+>',
- r'|'.join(map(re.escape, POSSIBLE_RESULTS)),
- )),
- filler_after_noack=re.escape(FILLER_AFTER_NOACK),
- )
- )
-
-
- def for_subthread(proc_i, thread_i, thread_shared_lock):
- # FLockFileHandler test
- logger = logging.getLogger()
- rec_pattern = ' '.join((PY_VER, 'proc:%d thread:%d rec:%%s'
- % (proc_i, thread_i)))
- for rec_i in irange(logrecords):
- logger.info(rec_pattern % rec_i)
-
- # additional FLockRLock tests
- per_thread_lockfile = open(abspath(filename), 'a')
- try:
- per_thread_lock = FLockRLock(per_thread_lockfile)
- descr2locks = {'per-thread': per_thread_lock,
- 'thread-shared': thread_shared_lock}
- msg_pattern = rec_pattern % '%s %s %s'
- msg = dict((result,
- dict((lock, msg_pattern % (descr, lock, result))
- for descr, lock in descr2locks.items()))
- for result in POSSIBLE_RESULTS)
- msg_acquired = msg['acquired']
- for lock, m in msg_acquired.items():
- # to be written directly to the file -- to avoid deadlock
- msg_acquired[lock] = ''.join((24 * '-', m, '\n'))
- filler_after_noack = rec_pattern % FILLER_AFTER_NOACK
-
- # let's go
- locks = list(descr2locks.values()) # Py3's values() is a view
- for i in irange(locktests):
- if randint(0, 1):
- get_iter = iter
- else:
- get_iter = reversed
- for lock in get_iter(locks):
- if lock.acquire(blocking=randint(0, 1)):
- try:
- lock.lockfile.write(msg['acquired'][lock])
- lock.lockfile.flush()
- finally:
- lock.release()
- logger.info(msg['released'][lock])
- else:
- logger.info(msg['not acquired'][lock])
- logger.info(filler_after_noack)
- finally:
- per_thread_lockfile.close()
-
-
- def for_subprocess(proc_i):
- # setting up logging to test FLockFileHandler
- f = logging.Formatter('%(asctime)s %(message)s')
- h = FLockFileHandler(filename)
- h.setFormatter(f)
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- logger.addHandler(h)
-
- # (additional standalone FLockRLock instance also to be tested)
- thread_shared_lockfile = open(abspath(filename), 'a')
- try:
- thread_shared_lock = FLockRLock(thread_shared_lockfile)
-
- # the actual tests
- threads = [threading.Thread(target=for_subthread,
- args=(proc_i,
- thread_i,
- thread_shared_lock))
- for thread_i in irange(subthreads)]
- for t in threads: t.start()
- for t in threads: t.join() # wait for subthreads
- finally:
- thread_shared_lockfile.close()
-
-
- def check_records_only():
- logfile = open(abspath(filename))
- try:
- try:
- badline = inext(filterfalse(RECORD_REGEX.match, logfile))
- except StopIteration:
- return "OK"
- else:
- sys.exit('Invalid record found: %s' % badline)
- finally:
- logfile.close()
-
-
- def check_records_and_len():
- (expected_len # expected number of generated log records
- ) = subprocs * subthreads * (logrecords + (4 * locktests))
- logfile = open(abspath(filename))
- try:
- # fast + Py2.4-compatibile way to check file content and length
- file_ending = islice(takewhile(RECORD_REGEX.match, logfile),
- expected_len - 1, expected_len + 1)
- try:
- inext(file_ending)
- except StopIteration:
- sys.exit('Too few valid lines found (%d expected)'
- % expected_len)
- # at this point all the file content should be consumed
- try:
- inext(file_ending)
- except StopIteration:
- return "OK"
- else:
- sys.exit('Too many valid (?) lines found (%d expected)'
- % expected_len)
- finally:
- logfile.close()
-
-
- if firstdelete:
- try:
- os.remove(abspath(filename))
- except OSError:
- pass
-
- for proc_i in irange(subprocs):
- if not os.fork():
- # we are in a subprocess
- for_subprocess(proc_i)
- break
- else:
- # we are in the parent process
- for i in irange(subprocs):
- os.wait() # wait for subprocesses
-
- if firstdelete:
- print(check_records_and_len())
- else:
- print(check_records_only())
-
-
-
- # try running the script simultaneously using different Python versions :)
- if __name__ == '__main__':
- _test(*sys.argv[1:])