Welcome, guest | Sign In | My Account | Store | Cart

I needed a simple logging solution that I could use with with concurrent.futures.ProcessPoolExecutor and this is my initial recipe.

Python, 152 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python3
import sys
import os
import logging
import traceback
import time
import random
import inspect
import concurrent.futures
import warnings 

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

FMT_PID = '{asctime} {name} {process} {levelname} {message}'
logging.basicConfig(format=FMT_PID, style='{', level=logging.DEBUG)
#logging.basicConfig(filename='example.log', format=FMT_PID, style='{', level=logging.DEBUG)
LOGGER_NAME = 'mylogger'
_log = logging.getLogger(LOGGER_NAME)


class LM(object):  # allows str.format log messages to be used
    def __init__(self, msg, *args, **kwargs):
        self.msg = msg
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.msg.format(*self.args, **self.kwargs)


class NondurableLogger(object):
    """ This class supports a subset of logging.Logger features for use with concurrent.futures.ProcessPoolExecutor.
   
    Usage:
    
    The functions executed by the ProcessPoolExecutor's submit and map methods should create an instance of this
    class, use its methods to log messages, and return it along with their result.  The supervising process is
    responsible for handling the worker process log records - normally this is done by passing an appropriate
    Logger to the NondurableLogger.handle method.

    Although the stored log records are perishable, reasonable durability can be obtained by judicious use of
    exception handling in the functions executed by the ProcessPoolExecutor submit and map methods.
    
    Notes:

        1) threading.Lock cannot be pickled
        2) this class makes no promise of thread safety

    """

    def __init__(self, name, *, capacity=1000):
        if not isinstance(capacity, int):
            raise TypeError('capacity must be an integer')

        if capacity <= 0:
            raise ValueError('capacity must be a positive integer')

        self.name = name
        self._capacity = capacity
        self._records = []
        self._make_record = logging.getLogRecordFactory()

    def log_(self, lvl, msg, *args, **kwargs):
        if not isinstance(lvl, int):
            raise TypeError('level must be an integer')

        if len(self._records) >= self._capacity:
            warnings.warn('{0} capacity is full {1}'.format(type(self).__qualname__, os.getpid()))
        else:
            if not isinstance(msg, str):
                msg = str(msg)

            caller = inspect.currentframe().f_back.f_back
            fname, lineno, *_ = inspect.getframeinfo(caller)
            exc_info = sys.exc_info() if 'exc_info' in kwargs and kwargs['exc_info'] else None

            if not exc_info is None:
                tb = ''.join(traceback.format_exception(*exc_info))
                msg = '\n'.join((msg, tb))

            rec = self._make_record(self.name, lvl, fname, lineno, msg, args, None)
            self._records.append(rec)

    def debug(self, msg, *args):
        self.log_(logging.DEBUG, msg, *args)

    def info(self, msg, *args):
        self.log_(logging.INFO, msg, *args)

    def warning(self, msg, *args):
        self.log_(logging.WARNING, msg, *args)

    def error(self, msg, *args):
        self.log_(logging.ERROR, msg, *args)

    def critical(self, msg, *args):
        self.log_(logging.CRITICAL, msg, *args)

    def exception(self, msg, *args):
        self.log_(logging.ERROR, msg, *args, exc_info=True)

    def handle(self, target):
        assert isinstance(target, logging.Logger)

        for rec in self._records:
            target.handle(rec)

    def records(self):
        return self._records


def do_work(url, doze):
    result = None
    ndl = NondurableLogger(LOGGER_NAME)

    try:
        ndl.info(LM('converting {0}', url))
        result = url.replace('http', 'https')
        ndl.debug(LM('sleeping for {0}', doze))
        time.sleep(doze)

        if doze > 3:
            ndl.warning('very slow')

        num = len(result)/doze
        ndl.debug(LM('quotient {0}', num))
    except Exception:
        ndl.exception(LM('division by {0}', doze))

    return result, ndl


def main():
    _log.info('start')
    workers = 5

    with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
        cf_doze = [random.randint(0, 5) for _ in URLS]
        args = (URLS, cf_doze)

        for url, ndl in executor.map(do_work, *args):
            ndl.handle(_log)
            _log.info(LM('>> {0}', url))

    _log.info('end')

if __name__ == '__main__':
    main()

Although I have an idea for a more Pythonic logging solution for use with concurrent.futures.ProcessPoolExecutor, I've not had time to implement and test it. This solution, if you're comfortable with its limitations, has worked reasonably well for me so far.