I needed a simple logging solution that I could use with with concurrent.futures.ProcessPoolExecutor and this is my initial recipe.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | #!/usr/bin/python3
import sys
import os
import logging
import traceback
import time
import random
import inspect
import concurrent.futures
import warnings
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
FMT_PID = '{asctime} {name} {process} {levelname} {message}'
logging.basicConfig(format=FMT_PID, style='{', level=logging.DEBUG)
#logging.basicConfig(filename='example.log', format=FMT_PID, style='{', level=logging.DEBUG)
LOGGER_NAME = 'mylogger'
_log = logging.getLogger(LOGGER_NAME)
class LM(object): # allows str.format log messages to be used
def __init__(self, msg, *args, **kwargs):
self.msg = msg
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.msg.format(*self.args, **self.kwargs)
class NondurableLogger(object):
""" This class supports a subset of logging.Logger features for use with concurrent.futures.ProcessPoolExecutor.
Usage:
The functions executed by the ProcessPoolExecutor's submit and map methods should create an instance of this
class, use its methods to log messages, and return it along with their result. The supervising process is
responsible for handling the worker process log records - normally this is done by passing an appropriate
Logger to the NondurableLogger.handle method.
Although the stored log records are perishable, reasonable durability can be obtained by judicious use of
exception handling in the functions executed by the ProcessPoolExecutor submit and map methods.
Notes:
1) threading.Lock cannot be pickled
2) this class makes no promise of thread safety
"""
def __init__(self, name, *, capacity=1000):
if not isinstance(capacity, int):
raise TypeError('capacity must be an integer')
if capacity <= 0:
raise ValueError('capacity must be a positive integer')
self.name = name
self._capacity = capacity
self._records = []
self._make_record = logging.getLogRecordFactory()
def log_(self, lvl, msg, *args, **kwargs):
if not isinstance(lvl, int):
raise TypeError('level must be an integer')
if len(self._records) >= self._capacity:
warnings.warn('{0} capacity is full {1}'.format(type(self).__qualname__, os.getpid()))
else:
if not isinstance(msg, str):
msg = str(msg)
caller = inspect.currentframe().f_back.f_back
fname, lineno, *_ = inspect.getframeinfo(caller)
exc_info = sys.exc_info() if 'exc_info' in kwargs and kwargs['exc_info'] else None
if not exc_info is None:
tb = ''.join(traceback.format_exception(*exc_info))
msg = '\n'.join((msg, tb))
rec = self._make_record(self.name, lvl, fname, lineno, msg, args, None)
self._records.append(rec)
def debug(self, msg, *args):
self.log_(logging.DEBUG, msg, *args)
def info(self, msg, *args):
self.log_(logging.INFO, msg, *args)
def warning(self, msg, *args):
self.log_(logging.WARNING, msg, *args)
def error(self, msg, *args):
self.log_(logging.ERROR, msg, *args)
def critical(self, msg, *args):
self.log_(logging.CRITICAL, msg, *args)
def exception(self, msg, *args):
self.log_(logging.ERROR, msg, *args, exc_info=True)
def handle(self, target):
assert isinstance(target, logging.Logger)
for rec in self._records:
target.handle(rec)
def records(self):
return self._records
def do_work(url, doze):
result = None
ndl = NondurableLogger(LOGGER_NAME)
try:
ndl.info(LM('converting {0}', url))
result = url.replace('http', 'https')
ndl.debug(LM('sleeping for {0}', doze))
time.sleep(doze)
if doze > 3:
ndl.warning('very slow')
num = len(result)/doze
ndl.debug(LM('quotient {0}', num))
except Exception:
ndl.exception(LM('division by {0}', doze))
return result, ndl
def main():
_log.info('start')
workers = 5
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
cf_doze = [random.randint(0, 5) for _ in URLS]
args = (URLS, cf_doze)
for url, ndl in executor.map(do_work, *args):
ndl.handle(_log)
_log.info(LM('>> {0}', url))
_log.info('end')
if __name__ == '__main__':
main()
|
Although I have an idea for a more Pythonic logging solution for use with concurrent.futures.ProcessPoolExecutor, I've not had time to implement and test it. This solution, if you're comfortable with its limitations, has worked reasonably well for me so far.
Tags: logging, multiprocessing