A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler (logging.FileHandler replacement, designed for logging to a single file from multiple independent processes) together with a simple interprocess recursive lock -- universal abstract classes + Unix/Linux implementation.
Update: It's is a deeply revised version. Especially, now it --
- is Python 2.4, 2.5, 2.6, 3.1 -compatibile (previously Py>=2.6 was needed); probably works also with 2.7, 3.0 and 3.2 (but not tested if it does);
- is multiprocess-safe as well as thread-safe (proviously thread safety within a process was missed);
- is based on public interfaces only (previously FileHandler._open() was called and overriden);
- implement full RLock instance interface, as documented for threading.RLock (previously non-blocking mode and context-manager interface were missing).
The module contains:
- Unix/Linux-only example implementation (with flock-based locking): FLockRLock and FLockFileHandler classes.
- universal abstract classes -- which may be useful at developing implementation for non-Unix platforms: MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,
Also a quick-and-dirty test was added.
It is still an alpha version -- I'll be very grateful for any feedback.
Further updates:
2010-09-20: Some corrections, especially: non-blocking mode bug in MultiprocessRLock.acquire() fixed; _test() function improved; plus fixes in the description below.
2010-09-22: _test() improved and moved to description section. Mistaken copyright-notice removed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | # http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
#
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
#
# MultiprocessRLock acquire()/release() methods patterned, to some extent,
# after threading.RLock acquire()/release() of Python standard library.
"""
Multiprocess-safe logging and interprocess locking classes.
A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler
(logging.FileHandler replacement, designed for logging to a single file from
multiple independent processes) together with a simple interprocess RLock.
The module contains:
* universal abstract classes:
MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,
* Unix/Linux-only example implementation (with flock-based locking):
FLockRLock and FLockFileHandler classes.
Tested under Debian GNU/Linux, with Python 2.4, 2.5, 2.6 and 3.1.
"""
import logging
import os
import sys
#
# Unix or non-Unix platform? (supporting or not supporting the fcntl module)
try:
# Unix/Linux
import fcntl
__all__ = (
# abstract classes:
'MultiprocessRLock',
'MultiprocessFileHandler',
'LockedFileHandler',
# fcntl.flock()-based implementation:
'FLockRLock',
'FLockFileHandler',
)
except ImportError:
# non-Unix
fcntl = None
__all__ = (
# abstract classes only:
'MultiprocessRLock',
'MultiprocessFileHandler',
'LockedFileHandler',
)
#
# Real or dummy threading?
try:
import threading
except ImportError:
import dummy_threading as threading
#
# Python 2.x or 3.x?
try:
# 2.x (including < 2.6)
try:
from thread import get_ident as get_thread_ident
except ImportError:
from dummy_thread import get_ident as get_thread_ident
except ImportError:
# 3.x
def get_thread_ident(get_current_thread=threading.current_thread):
return get_current_thread().ident
#
# Abstract classes
#
class MultiprocessRLock(object):
"Interprocess and interthread recursive lock (abstract class)."
def __init__(self):
self._threading_lock = threading.Lock()
self._owner = None
self._count = 0
def __repr__(self):
return '<%s owner=%s count=%d>' % (self.__class__.__name__,
self._owner, self._count)
def _interprocess_lock_acquire(self, blocking): # abstract method
# the implementing function should return:
# * True on success
# * False on failure (applies to non-blocking mode)
raise NotImplementedError
def _interprocess_lock_release(self): # abstract method
raise NotImplementedError
@staticmethod
def _get_me(getpid=os.getpid, get_thread_ident=get_thread_ident):
return '%d:%d' % (getpid(), get_thread_ident())
def acquire(self, blocking=1):
me = self._get_me()
if self._owner == me:
self._count += 1
return True
if not self._threading_lock.acquire(blocking):
return False
acquired = False
try:
acquired = self._interprocess_lock_acquire(blocking)
finally:
if not acquired:
# important to be placed within the finally-block
self._threading_lock.release()
else:
self._owner = me
self._count = 1
return acquired
__enter__ = acquire
def release(self):
if self._owner != self._get_me():
raise RuntimeError("cannot release un-acquired lock")
self._count -= 1
if not self._count:
self._owner = None
self._interprocess_lock_release()
self._threading_lock.release()
def __exit__(self, *args, **kwargs):
self.release()
class MultiprocessFileHandler(logging.FileHandler):
"Multiprocess-safe logging.FileHandler replacement (abstract class)."
def createLock(self): # abstract method
"Create a lock for serializing access to the underlying I/O."
raise NotImplementedError
class LockedFileHandler(MultiprocessFileHandler):
"File-locking based logging.FileHandler replacement (abstract class)."
def __init__(self, filename, mode='a', encoding=None, delay=0):
"Open the specified file and use it for logging and file locking."
if delay:
raise ValueError('cannot initialize LockedFileHandler'
' instance with non-zero delay')
# base classe's __init__() calls createLock() method before setting
# self.stream -- so we have to mask that method temporarily:
self.createLock = lambda: None
MultiprocessFileHandler.__init__(self, filename, mode, encoding)
del self.createLock # now unmask...
self.createLock() # ...and call it
if fcntl is not None:
#
# Unix/Linux implementation
#
class FLockRLock(MultiprocessRLock):
"flock-based MultiprocessRLock implementation (Unix/Linux only)."
def __init__(self, lockfile):
MultiprocessRLock.__init__(self)
self.lockfile = lockfile
def _interprocess_lock_acquire(self, blocking,
flock=fcntl.flock,
flags=(fcntl.LOCK_EX | fcntl.LOCK_NB,
fcntl.LOCK_EX),
exc_info=sys.exc_info):
try:
flock(self.lockfile, flags[blocking])
except IOError:
# Python 2.x & 3.x -compatibile way to get
# the exception object: call sys.exc_info()
if exc_info()[1].errno in (11, 13):
return False # <- applies to non-blocking mode only
raise
else:
return True
def _interprocess_lock_release(self, flock=fcntl.flock,
LOCK_UN=fcntl.LOCK_UN):
flock(self.lockfile, LOCK_UN)
class FLockFileHandler(LockedFileHandler):
"LockedFileHandler implementation using FLockRLock (Unix/Linux only)."
def createLock(self):
"Create a lock for serializing access to the underlying I/O."
self.lock = FLockRLock(self.stream)
|
Use FLockFileHandler like logging.FileHandler -- except that 'delay' argument must be zero/false or omitted;
fcntl.flock()-based file locking does not seem to incur dramatic overhead (results of quick test on my machine: 12 processes * 3 threads in each * 1000 log records == 36000 log records: logging.FileHandler -- about 8 seconds, FLockFileHandler -- about 11 seconds).
FLockRLock can also be used by itself -- like threading.Rlock, except that:
- the lock file object must be passed into the constructor,
- you can (that's the point!) synchronize multiple independent processes, not only threads;
please ensure that your locks use the same file path but different file descriptors -- see: http://code.activestate.com/recipes/577404-fcntlflock-unix-file-lock-behaviour-sampling-scrip/
A quick'n'dirty test script ((Unix/Linux only):
#!/usr/bin/env python
from multiprocessfilehandler import *
import logging
import os
import re
import sys
import threading
from itertools import islice, takewhile
from os.path import abspath
from random import randint
try:
from itertools import filterfalse # Py3.x
except ImportError:
from itertools import ifilterfalse as filterfalse # Py2.x
try: irange = xrange
except NameError: # Py2's xrange() is range() in Py3.x
irange = range
try: inext = next
except NameError:
inext = lambda i: i.next()
# constants
PY_VER = sys.version[:3]
DEFAULT_FILENAME = 'test.log'
LOG_FORMAT = '%(asctime)s %(message)s'
REC_BODY_PATTERN = 'proc:%(pid)d thread:%(thread_ident)d rec:%%s'
LOCK_DESCR = 'per-thread', 'thread-shared'
POSSIBLE_RESULTS = 'acquired', 'released', 'not acquired'
FILLER_AFTER_NOACK = 'so nothing to release :)'
RECORD_REGEX = re.compile(
r'('
r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} ' # time
r'|'
r'-{24}' # written with stream.write()
r')'
r'%(py_ver)s'
r' proc:\d+ thread:\d+ rec:'
r'('
r'\d+' # record counter
r'|'
r'(%(msg_pattern)s)'
r'|'
r'%(filler_after_noack)s'
r')$'
% dict(
py_ver=r'[\d\.]{%d}' % len(PY_VER),
msg_pattern=r') ('.join((
r'|'.join(map(re.escape, LOCK_DESCR)),
r'<FLockRLock owner=\w+ count=\d+>',
r'|'.join(map(re.escape, POSSIBLE_RESULTS)),
)),
filler_after_noack=re.escape(FILLER_AFTER_NOACK),
)
)
def for_subthread(thread_shared_lock, thread_i, proc_i,
logrecords, locktests, filename):
# FLockFileHandler test
logger = logging.getLogger()
rec_pattern = ' '.join((PY_VER, REC_BODY_PATTERN
% dict(pid=proc_i,
thread_ident=thread_i)))
for rec_i in irange(logrecords):
logger.info(rec_pattern % rec_i)
# additional per-thread/thread-shared -files-based FLockRLock tests
per_thread_lockfile = open(abspath(filename), 'a')
try:
per_thread_lock = FLockRLock(per_thread_lockfile)
descr2locks = {'per-thread': per_thread_lock,
'thread-shared': thread_shared_lock}
msg_pattern = rec_pattern % '%s %s %s'
msg = dict((result,
dict((lock, msg_pattern % (descr, lock, result))
for descr, lock in descr2locks.items()))
for result in POSSIBLE_RESULTS)
msg_acquired = msg['acquired']
for lock, m in msg_acquired.items():
# to be written directly to the file -- to avoid deadlock
msg_acquired[lock] = ''.join((24 * '-', m, '\n'))
filler_after_noack = rec_pattern % FILLER_AFTER_NOACK
locks = list(descr2locks.values()) # Py3's .values() -> a view
for i in irange(locktests):
if randint(0, 1):
iterlocks = iter(locks)
else:
iterlocks = reversed(locks)
for lock in iterlocks:
if lock.acquire(blocking=randint(0, 1)):
try:
lock.lockfile.write(msg['acquired'][lock])
lock.lockfile.flush()
finally:
lock.release()
logger.info(msg['released'][lock])
else:
logger.info(msg['not acquired'][lock])
logger.info(filler_after_noack)
finally:
per_thread_lockfile.close()
def for_subprocess(proc_i, subthreads, logrecords, locktests, filename):
# setting up logging to test FLockFileHandler
f = logging.Formatter(LOG_FORMAT)
h = FLockFileHandler(filename)
h.setFormatter(f)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(h)
# (standalone FLockRLock instance also to be tested)
thread_shared_lockfile = open(abspath(filename), 'a')
try:
thread_shared_lock = FLockRLock(thread_shared_lockfile)
threads = [threading.Thread(target=for_subthread,
args=(thread_shared_lock,
thread_i, proc_i,
logrecords, locktests,
filename))
for thread_i in irange(subthreads)]
for t in threads: t.start()
for t in threads: t.join() # wait for subthreads
finally:
thread_shared_lockfile.close()
def check_records_only(filename):
logfile = open(abspath(filename))
try:
try:
badline = inext(filterfalse(RECORD_REGEX.match, logfile))
except StopIteration:
return "OK"
else:
sys.exit('Invalid record found: %s' % badline)
finally:
logfile.close()
def check_records_and_len(filename, expected_len):
logfile = open(abspath(filename))
try:
# Py2.4-compatibile fast way to check file content and length
file_ending = islice(takewhile(RECORD_REGEX.match, logfile),
expected_len - 1, expected_len + 1)
try:
inext(file_ending)
except StopIteration:
sys.exit('Too few valid lines found (%d expected)'
% expected_len)
# at this point the file content should have been read entirely
try:
inext(file_ending)
except StopIteration:
return "OK"
else:
sys.exit('Too many valid (?) lines found (%d expected)'
% expected_len)
finally:
logfile.close()
def main(subprocs=3, subthreads=3, logrecords=5000,
locktests=500, firstdelete=1, filename=DEFAULT_FILENAME):
# args may origin from command line, so we map it to int
(subprocs, subthreads, logrecords, firstdelete, locktests
) = map(int, (subprocs, subthreads, logrecords, firstdelete, locktests))
# expected number of generated log records
expected_len = subprocs * subthreads * (logrecords + (4 * locktests))
if firstdelete:
try:
os.remove(abspath(filename))
except OSError:
pass
for proc_i in irange(subprocs):
if not os.fork():
# we are in a subprocess
for_subprocess(proc_i, subthreads, logrecords,
locktests, filename)
break
else:
# we are in the parent process
for i in irange(subprocs):
os.wait() # wait for subprocesses
# finally, check the resulting log file content
if firstdelete:
print(check_records_and_len(filename, expected_len))
else:
print(check_records_only(filename))
# try running the script simultaneously using different Python versions :)
if __name__ == '__main__':
main(*sys.argv[1:])
Very interesting. I faced unwanted log-rollovers using a rotating log handler shared by many concurrent process and this could be the solution, but it should be subclassed from (Timed)RotatingFileHandler, do u think it's possible?
Thanks! Cri
Hi, Cristiano.
It'd be possible after some (deeper) improvements, not just like that, unfortunately. This receipe was deliberately written as (possibly) simple.
You may be interested in ConcurrentLogHandler (written by Lowell Alleman) -- see: http://pypi.python.org/pypi/ConcurrentLogHandler/0.8.4 -- being (also fairly simple) replacement for RotatingFileHandler.