Welcome, guest | Sign In | My Account | Store | Cart

A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler (logging.FileHandler replacement, designed for logging to a single file from multiple independent processes) together with a simple interprocess recursive lock -- universal abstract classes + Unix/Linux implementation.

Update: It's is a deeply revised version. Especially, now it --

  • is Python 2.4, 2.5, 2.6, 3.1 -compatibile (previously Py>=2.6 was needed); probably works also with 2.7, 3.0 and 3.2 (but not tested if it does);
  • is multiprocess-safe as well as thread-safe (proviously thread safety within a process was missed);
  • is based on public interfaces only (previously FileHandler._open() was called and overriden);
  • implement full RLock instance interface, as documented for threading.RLock (previously non-blocking mode and context-manager interface were missing).

The module contains:

  • Unix/Linux-only example implementation (with flock-based locking): FLockRLock and FLockFileHandler classes.
  • universal abstract classes -- which may be useful at developing implementation for non-Unix platforms: MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,

Also a quick-and-dirty test was added.

It is still an alpha version -- I'll be very grateful for any feedback.


Further updates:

  • 2010-09-20: Some corrections, especially: non-blocking mode bug in MultiprocessRLock.acquire() fixed; _test() function improved; plus fixes in the description below.

  • 2010-09-22: _test() improved and moved to description section. Mistaken copyright-notice removed.

Python, 209 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# http://code.activestate.com/recipes/577395-multiprocess-safe-logging-file-handler/
#
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the PSF License.
#
# MultiprocessRLock acquire()/release() methods patterned, to some extent,
# after threading.RLock acquire()/release() of Python standard library.

"""
Multiprocess-safe logging and interprocess locking classes.

A Python 2.x/3.x-compatibile multiprocess-safe logging file-handler
(logging.FileHandler replacement, designed for logging to a single file from
multiple independent processes) together with a simple interprocess RLock.

The module contains:
* universal abstract classes:
  MultiprocessRLock, MultiprocessFileHandler, LockedFileHandler,
* Unix/Linux-only example implementation (with flock-based locking):
  FLockRLock and FLockFileHandler classes.

Tested under Debian GNU/Linux, with Python 2.4, 2.5, 2.6 and 3.1.
"""

import logging
import os
import sys

#
# Unix or non-Unix platform? (supporting or not supporting the fcntl module)
try:
    # Unix/Linux
    import fcntl
    __all__ = (
        # abstract classes:
        'MultiprocessRLock',
        'MultiprocessFileHandler',
        'LockedFileHandler',
        # fcntl.flock()-based implementation:
        'FLockRLock',
        'FLockFileHandler',
    )
except ImportError:
    # non-Unix
    fcntl = None
    __all__ = (
        # abstract classes only:
        'MultiprocessRLock',
        'MultiprocessFileHandler',
        'LockedFileHandler',
    )

#
# Real or dummy threading?
try:
    import threading
except ImportError:
    import dummy_threading as threading

#
# Python 2.x or 3.x?
try:
    # 2.x (including < 2.6)
    try:
        from thread import get_ident as get_thread_ident
    except ImportError:
        from dummy_thread import get_ident as get_thread_ident
except ImportError:
    # 3.x
    def get_thread_ident(get_current_thread=threading.current_thread):
        return get_current_thread().ident



#
# Abstract classes
#

class MultiprocessRLock(object):

    "Interprocess and interthread recursive lock (abstract class)."

    def __init__(self):
        self._threading_lock = threading.Lock()
        self._owner = None
        self._count = 0

    def __repr__(self):
        return '<%s owner=%s count=%d>' % (self.__class__.__name__,
                                           self._owner, self._count)

    def _interprocess_lock_acquire(self, blocking):  # abstract method
        # the implementing function should return:
        # * True on success
        # * False on failure (applies to non-blocking mode)
        raise NotImplementedError

    def _interprocess_lock_release(self):  # abstract method
        raise NotImplementedError

    @staticmethod
    def _get_me(getpid=os.getpid, get_thread_ident=get_thread_ident):
        return '%d:%d' % (getpid(), get_thread_ident())

    def acquire(self, blocking=1):
        me = self._get_me()
        if self._owner == me:
            self._count += 1
            return True
        if not self._threading_lock.acquire(blocking):
            return False
        acquired = False
        try:
            acquired = self._interprocess_lock_acquire(blocking)
        finally:
            if not acquired:
                # important to be placed within the finally-block
                self._threading_lock.release()
            else:
                self._owner = me
                self._count = 1
        return acquired

    __enter__ = acquire

    def release(self):
        if self._owner != self._get_me():
            raise RuntimeError("cannot release un-acquired lock")
        self._count -= 1
        if not self._count:
            self._owner = None
            self._interprocess_lock_release()
            self._threading_lock.release()

    def __exit__(self, *args, **kwargs):
        self.release()



class MultiprocessFileHandler(logging.FileHandler):

    "Multiprocess-safe logging.FileHandler replacement (abstract class)."

    def createLock(self):  # abstract method
        "Create a lock for serializing access to the underlying I/O."
        raise NotImplementedError



class LockedFileHandler(MultiprocessFileHandler):

    "File-locking based logging.FileHandler replacement (abstract class)."

    def __init__(self, filename, mode='a', encoding=None, delay=0):
        "Open the specified file and use it for logging and file locking."
        if delay:
            raise ValueError('cannot initialize LockedFileHandler'
                             ' instance with non-zero delay')
        # base classe's __init__() calls createLock() method before setting
        # self.stream -- so we have to mask that method temporarily:
        self.createLock = lambda: None
        MultiprocessFileHandler.__init__(self, filename, mode, encoding)
        del self.createLock  # now unmask...
        self.createLock()    # ...and call it



if fcntl is not None:

    #
    # Unix/Linux implementation
    #

    class FLockRLock(MultiprocessRLock):

        "flock-based MultiprocessRLock implementation (Unix/Linux only)."

        def __init__(self, lockfile):
            MultiprocessRLock.__init__(self)
            self.lockfile = lockfile

        def _interprocess_lock_acquire(self, blocking,
                                       flock=fcntl.flock,
                                       flags=(fcntl.LOCK_EX | fcntl.LOCK_NB,
                                              fcntl.LOCK_EX),
                                       exc_info=sys.exc_info):
            try:
                flock(self.lockfile, flags[blocking])
            except IOError:
                # Python 2.x & 3.x -compatibile way to get
                # the exception object: call sys.exc_info()
                if exc_info()[1].errno in (11, 13):
                    return False  # <- applies to non-blocking mode only
                raise
            else:
                return True

        def _interprocess_lock_release(self, flock=fcntl.flock,
                                       LOCK_UN=fcntl.LOCK_UN):
            flock(self.lockfile, LOCK_UN)



    class FLockFileHandler(LockedFileHandler):

        "LockedFileHandler implementation using FLockRLock (Unix/Linux only)."

        def createLock(self):
            "Create a lock for serializing access to the underlying I/O."
            self.lock = FLockRLock(self.stream)

Use FLockFileHandler like logging.FileHandler -- except that 'delay' argument must be zero/false or omitted;

fcntl.flock()-based file locking does not seem to incur dramatic overhead (results of quick test on my machine: 12 processes * 3 threads in each * 1000 log records == 36000 log records: logging.FileHandler -- about 8 seconds, FLockFileHandler -- about 11 seconds).


FLockRLock can also be used by itself -- like threading.Rlock, except that:

  • the lock file object must be passed into the constructor,
  • you can (that's the point!) synchronize multiple independent processes, not only threads;

please ensure that your locks use the same file path but different file descriptors -- see: http://code.activestate.com/recipes/577404-fcntlflock-unix-file-lock-behaviour-sampling-scrip/


A quick'n'dirty test script ((Unix/Linux only):

#!/usr/bin/env python

from multiprocessfilehandler import *

import logging
import os
import re
import sys
import threading

from itertools import islice, takewhile
from os.path import abspath
from random import randint

try:
    from itertools import filterfalse  # Py3.x
except ImportError:
    from itertools import ifilterfalse as filterfalse  # Py2.x

try: irange = xrange
except NameError:  # Py2's xrange() is range() in Py3.x
    irange = range

try: inext = next
except NameError:
    inext = lambda i: i.next()

# constants

PY_VER = sys.version[:3]
DEFAULT_FILENAME = 'test.log'
LOG_FORMAT = '%(asctime)s %(message)s'
REC_BODY_PATTERN = 'proc:%(pid)d thread:%(thread_ident)d rec:%%s'

LOCK_DESCR = 'per-thread', 'thread-shared'
POSSIBLE_RESULTS = 'acquired', 'released', 'not acquired'
FILLER_AFTER_NOACK = 'so nothing to release :)'

RECORD_REGEX = re.compile(
    r'('
        r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} '  # time
        r'|'
        r'-{24}'  # written with stream.write()
    r')'
    r'%(py_ver)s'
    r' proc:\d+ thread:\d+ rec:'
    r'('
        r'\d+'  # record counter
        r'|'
        r'(%(msg_pattern)s)'
        r'|'
        r'%(filler_after_noack)s'
    r')$'
    % dict(
        py_ver=r'[\d\.]{%d}' % len(PY_VER),
        msg_pattern=r') ('.join((
            r'|'.join(map(re.escape, LOCK_DESCR)),
            r'<FLockRLock owner=\w+ count=\d+>',
            r'|'.join(map(re.escape, POSSIBLE_RESULTS)),
        )),
        filler_after_noack=re.escape(FILLER_AFTER_NOACK),
    )
)


def for_subthread(thread_shared_lock, thread_i, proc_i,
                  logrecords, locktests, filename):

    # FLockFileHandler test
    logger = logging.getLogger()
    rec_pattern = ' '.join((PY_VER, REC_BODY_PATTERN
                                    % dict(pid=proc_i,
                                           thread_ident=thread_i)))
    for rec_i in irange(logrecords):
        logger.info(rec_pattern % rec_i)

    # additional per-thread/thread-shared -files-based FLockRLock tests
    per_thread_lockfile = open(abspath(filename), 'a')
    try:
        per_thread_lock = FLockRLock(per_thread_lockfile)
        descr2locks = {'per-thread': per_thread_lock,
                       'thread-shared': thread_shared_lock}
        msg_pattern = rec_pattern % '%s %s %s'
        msg = dict((result,
                    dict((lock, msg_pattern % (descr, lock, result))
                         for descr, lock in descr2locks.items()))
                   for result in POSSIBLE_RESULTS)
        msg_acquired = msg['acquired']
        for lock, m in msg_acquired.items():
            # to be written directly to the file -- to avoid deadlock
            msg_acquired[lock] = ''.join((24 * '-', m, '\n'))
        filler_after_noack = rec_pattern % FILLER_AFTER_NOACK
        locks = list(descr2locks.values())  # Py3's .values() -> a view
        for i in irange(locktests):
            if randint(0, 1):
                iterlocks = iter(locks)
            else:
                iterlocks = reversed(locks)
            for lock in iterlocks:
                if lock.acquire(blocking=randint(0, 1)):
                    try:
                        lock.lockfile.write(msg['acquired'][lock])
                        lock.lockfile.flush()
                    finally:
                        lock.release()
                        logger.info(msg['released'][lock])
                else:
                    logger.info(msg['not acquired'][lock])
                    logger.info(filler_after_noack)
    finally:
        per_thread_lockfile.close()


def for_subprocess(proc_i, subthreads, logrecords, locktests, filename):

    # setting up logging to test FLockFileHandler
    f = logging.Formatter(LOG_FORMAT)
    h = FLockFileHandler(filename)
    h.setFormatter(f)
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(h)

    # (standalone FLockRLock instance also to be tested)
    thread_shared_lockfile = open(abspath(filename), 'a')
    try:
        thread_shared_lock = FLockRLock(thread_shared_lockfile)
        threads = [threading.Thread(target=for_subthread,
                                    args=(thread_shared_lock,
                                          thread_i, proc_i,
                                          logrecords, locktests,
                                          filename))
                   for thread_i in irange(subthreads)]
        for t in threads: t.start()
        for t in threads: t.join()  # wait for subthreads
    finally:
        thread_shared_lockfile.close()


def check_records_only(filename):
    logfile = open(abspath(filename))
    try:
        try:
            badline = inext(filterfalse(RECORD_REGEX.match, logfile))
        except StopIteration:
            return "OK"
        else:
            sys.exit('Invalid record found: %s' % badline)
    finally:
        logfile.close()


def check_records_and_len(filename, expected_len):
    logfile = open(abspath(filename))
    try:
        # Py2.4-compatibile fast way to check file content and length
        file_ending = islice(takewhile(RECORD_REGEX.match, logfile),
                             expected_len - 1, expected_len + 1)
        try:
            inext(file_ending)
        except StopIteration:
            sys.exit('Too few valid lines found (%d expected)'
                     % expected_len)
        # at this point the file content should have been read entirely
        try:
            inext(file_ending)
        except StopIteration:
            return "OK"
        else:
            sys.exit('Too many valid (?) lines found (%d expected)'
                     % expected_len)
    finally:
        logfile.close()


def main(subprocs=3, subthreads=3, logrecords=5000,
         locktests=500, firstdelete=1, filename=DEFAULT_FILENAME):

    # args may origin from command line, so we map it to int
    (subprocs, subthreads, logrecords, firstdelete, locktests
    ) = map(int, (subprocs, subthreads, logrecords, firstdelete, locktests))

    # expected number of generated log records
    expected_len = subprocs * subthreads * (logrecords + (4 * locktests))

    if firstdelete:
        try:
            os.remove(abspath(filename))
        except OSError:
            pass

    for proc_i in irange(subprocs):
        if not os.fork():
            # we are in a subprocess
            for_subprocess(proc_i, subthreads, logrecords,
                           locktests, filename)
            break
    else:
        # we are in the parent process
        for i in irange(subprocs):
            os.wait()  # wait for subprocesses

        # finally, check the resulting log file content
        if firstdelete:
            print(check_records_and_len(filename, expected_len))
        else:
            print(check_records_only(filename))


# try running the script simultaneously using different Python versions :)
if __name__ == '__main__':
    main(*sys.argv[1:])

2 comments

Cristiano 13 years, 7 months ago  # | flag

Very interesting. I faced unwanted log-rollovers using a rotating log handler shared by many concurrent process and this could be the solution, but it should be subclassed from (Timed)RotatingFileHandler, do u think it's possible?

Thanks! Cri

Jan Kaliszewski (author) 13 years, 6 months ago  # | flag

Hi, Cristiano.

It'd be possible after some (deeper) improvements, not just like that, unfortunately. This receipe was deliberately written as (possibly) simple.

You may be interested in ConcurrentLogHandler (written by Lowell Alleman) -- see: http://pypi.python.org/pypi/ConcurrentLogHandler/0.8.4 -- being (also fairly simple) replacement for RotatingFileHandler.