#!/usr/bin/python3
import sys
import os
import logging
import traceback
import time
import random
import inspect
import concurrent.futures
import warnings
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
FMT_PID = '{asctime} {name} {process} {levelname} {message}'
logging.basicConfig(format=FMT_PID, style='{', level=logging.DEBUG)
#logging.basicConfig(filename='example.log', format=FMT_PID, style='{', level=logging.DEBUG)
LOGGER_NAME = 'mylogger'
_log = logging.getLogger(LOGGER_NAME)
class LM(object): # allows str.format log messages to be used
def __init__(self, msg, *args, **kwargs):
self.msg = msg
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.msg.format(*self.args, **self.kwargs)
class NondurableLogger(object):
""" This class supports a subset of logging.Logger features for use with concurrent.futures.ProcessPoolExecutor.
Usage:
The functions executed by the ProcessPoolExecutor's submit and map methods should create an instance of this
class, use its methods to log messages, and return it along with their result. The supervising process is
responsible for handling the worker process log records - normally this is done by passing an appropriate
Logger to the NondurableLogger.handle method.
Although the stored log records are perishable, reasonable durability can be obtained by judicious use of
exception handling in the functions executed by the ProcessPoolExecutor submit and map methods.
Notes:
1) threading.Lock cannot be pickled
2) this class makes no promise of thread safety
"""
def __init__(self, name, *, capacity=1000):
if not isinstance(capacity, int):
raise TypeError('capacity must be an integer')
if capacity <= 0:
raise ValueError('capacity must be a positive integer')
self.name = name
self._capacity = capacity
self._records = []
self._make_record = logging.getLogRecordFactory()
def log_(self, lvl, msg, *args, **kwargs):
if not isinstance(lvl, int):
raise TypeError('level must be an integer')
if len(self._records) >= self._capacity:
warnings.warn('{0} capacity is full {1}'.format(type(self).__qualname__, os.getpid()))
else:
if not isinstance(msg, str):
msg = str(msg)
caller = inspect.currentframe().f_back.f_back
fname, lineno, *_ = inspect.getframeinfo(caller)
exc_info = sys.exc_info() if 'exc_info' in kwargs and kwargs['exc_info'] else None
if not exc_info is None:
tb = ''.join(traceback.format_exception(*exc_info))
msg = '\n'.join((msg, tb))
rec = self._make_record(self.name, lvl, fname, lineno, msg, args, None)
self._records.append(rec)
def debug(self, msg, *args):
self.log_(logging.DEBUG, msg, *args)
def info(self, msg, *args):
self.log_(logging.INFO, msg, *args)
def warning(self, msg, *args):
self.log_(logging.WARNING, msg, *args)
def error(self, msg, *args):
self.log_(logging.ERROR, msg, *args)
def critical(self, msg, *args):
self.log_(logging.CRITICAL, msg, *args)
def exception(self, msg, *args):
self.log_(logging.ERROR, msg, *args, exc_info=True)
def handle(self, target):
assert isinstance(target, logging.Logger)
for rec in self._records:
target.handle(rec)
def records(self):
return self._records
def do_work(url, doze):
result = None
ndl = NondurableLogger(LOGGER_NAME)
try:
ndl.info(LM('converting {0}', url))
result = url.replace('http', 'https')
ndl.debug(LM('sleeping for {0}', doze))
time.sleep(doze)
if doze > 3:
ndl.warning('very slow')
num = len(result)/doze
ndl.debug(LM('quotient {0}', num))
except Exception:
ndl.exception(LM('division by {0}', doze))
return result, ndl
def main():
_log.info('start')
workers = 5
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
cf_doze = [random.randint(0, 5) for _ in URLS]
args = (URLS, cf_doze)
for url, ndl in executor.map(do_work, *args):
ndl.handle(_log)
_log.info(LM('>> {0}', url))
_log.info('end')
if __name__ == '__main__':
main()
Diff to Previous Revision
--- revision 1 2014-02-08 15:52:08
+++ revision 2 2014-02-10 17:50:59
@@ -56,7 +56,7 @@
if not isinstance(capacity, int):
raise TypeError('capacity must be an integer')
- if capacity < 0:
+ if capacity <= 0:
raise ValueError('capacity must be a positive integer')
self.name = name