#! /usr/bin/env python
######################################################################
# Copyright (c) by Kevin L. Sitze on 2011-01-23.
# This code may be used pursuant to the MIT License.
######################################################################
"""This package provides facilities to attach decorators to classes or
modules (possibly recursively). A tracing decorator is provided for
tracing function and method calls in your applications.
from decorators import *
@trace
def function(...):
'''Output goes to logging.getLogger(__module__),
where __module__ is the name of the module in which
the function is defined. If the current module is '__main__'
then the root logger will be used.
'''
pass
@trace(logger)
def function(...):
'''Output goes to the specified logger instance.
'''
pass
@trace("Logger")
def function(...):
'''Output goes to the named logger instance.
'''
class ClassName(object):
'''Output goes to the ClassName logger. If ClassName is
not in the '__main__' module then the name of its module
will be prefixed to the classname (separated by a dot).
All methods on the class will be traced.
'''
__logger__ = loggerOrLoggerName # optional
__metaclass__ = decorators.TraceMetaClass
def method(...):
pass
class PostDecorate(object):
def method(...):
pass
# attach a decorator to an existing class.
attach(trace(logger), PostDecorate)
You can also attach a decorator to an existing module.
"""
import inspect
import logging
import sys
import thread
import types
from functools import wraps
from itertools import chain
FunctionTypes = tuple(set((
types.BuiltinFunctionType,
types.FunctionType
)))
MethodTypes = tuple(set((
types.BuiltinMethodType,
types.MethodType,
types.UnboundMethodType
)))
class _C(object):
@classmethod
def classMethod(klass): pass
classMethodType = type(classMethod)
@staticmethod
def staticMethod(): pass
staticMethodType = type(staticMethod)
@property
def propertyMethod(self): pass
ClassMethodType = _C.classMethodType
StaticMethodType = _C.staticMethodType
PropertyType = type(_C.propertyMethod)
CallableTypes = tuple(set((
types.BuiltinFunctionType,
types.FunctionType,
types.BuiltinMethodType,
types.MethodType,
types.UnboundMethodType,
ClassMethodType
)))
__all__ = (
'ThreadLocal',
'TraceMetaClass',
'attach',
'getFormatter',
'setFormatter',
'getLoggerFactory',
'setLoggerFactory',
'trace'
)
######################################################################
# Utility functions
######################################################################
MAX_SIZE = 320
def chop(value):
s = repr(value)
if len(s) > MAX_SIZE:
return s[:MAX_SIZE] + '...' + s[-1]
else:
return s
def loggable(obj):
"""Return "True" if the obj implements the minimum Logger API
required by the 'trace' decorator.
"""
if isinstance(obj, logging.getLoggerClass()):
return True
else:
return ( inspect.isclass(obj) and
inspect.ismethod(getattr(obj, 'debug', None)) and
inspect.ismethod(getattr(obj, 'isEnabledFor', None)) and
inspect.ismethod(getattr(obj, 'setLevel', None)) )
######################################################################
# LoggerFactory Property
######################################################################
_logger_factory = logging
def getLoggerFactory():
"""Retrieve the current factory object for creating loggers.
The default is to use the logging module.
"""
global _logger_factory
return _logger_factory
def setLoggerFactory(factory):
"""Set a factory object for creating loggers. This object must
publish a method or class named 'getLogger' that takes a string
parameter naming the logger instance to retrieve. Logger objects
returned by this factory must, at a minimum, expose the methods
'isEnabledFor' and 'debug'.
"""
global _logger_factory
_logger_factory = factory
######################################################################
# class PrependLoggerFactory
######################################################################
class PrependLoggerFactory(object):
"""This is a convenience class for creating new loggers for the
"trace" decorator. All loggers created via this class will have a
user specified prefix prepended to the name of the logger to
instantiate.
"""
def __init__(self, prefix = 'trace'):
"""Construct a new "PrependLoggerFactory" instance that
prepends the value \var{prefix} to the name of each
logger to be created by this class.
"""
self.__prefix = prefix.strip('.')
@property
def prefix(self):
"""The value to prefix to each logger created by this factory.
"""
return self.__prefix
@prefix.setter
def prefix(self, value):
self.__prefix = value.strip('.')
def getLogger(self, name):
return logging.getLogger('.'.join((self.__prefix, name)))
######################################################################
# class ThreadLocal
######################################################################
class ThreadLocal(object):
"""Instances of this class provide a thread-local variable.
"""
def __init__(self):
self.__lock = thread.allocate_lock()
self.__vars = dict()
self.__init = None
@property
def value(self):
with self.__lock:
try:
return self.__vars[thread.get_ident()]
except KeyError:
return self.__init
@value.setter
def value(self, value):
with self.__lock:
self.__vars[thread.get_ident()] = value
@property
def initialValue(self):
with self.__lock:
return self.__init
@initialValue.setter
def initialValue(self, value):
with self.__lock:
self.__init = value
######################################################################
# Formatter functions
######################################################################
def _formatter_self(name, value):
"""Format the "self" variable and value on instance methods.
"""
__mname = value.__module__
if __mname != '__main__':
return '%s = <%s.%s object at 0x%x>' % (name, __mname, value.__class__.__name__, id(value))
else:
return '%s = <%s object at 0x%x>' % (name, value.__class__.__name__, id(value))
def _formatter_class(name, value):
"""Format the "klass" variable and value on class methods.
"""
__mname = value.__module__
if __mname != '__main__':
return "%s = " % (name, __mname, value.__name__)
else:
return "%s = " % (name, value.__name__)
def _formatter_named(name, value):
"""Format a named parameter and its value.
"""
return '%s = %s' % (name, chop(value))
def _formatter_defaults(name, value):
return '[%s = %s]' % (name, chop(value))
af_self = _formatter_self
af_class = _formatter_class
af_named = _formatter_named
af_default = _formatter_defaults
af_unnamed = chop
af_keyword = _formatter_named
def getFormatter(name):
"""Return the named formatter function. See the function
"setFormatter" for details.
"""
if name in ( 'self', 'instance', 'this' ):
return af_self
elif name == 'class':
return af_class
elif name in ( 'named', 'param', 'parameter' ):
return af_named
elif name in ( 'default', 'optional' ):
return af_default
elif name in ( 'anonymous', 'arbitrary', 'unnamed' ):
return af_anonymous
elif name in ( 'keyword', 'pair', 'pairs' ):
return af_keyword
else:
raise ValueError('unknown trace formatter %r' % name)
def setFormatter(name, func):
"""Replace the formatter function used by the trace decorator to
handle formatting a specific kind of argument. There are several
kinds of arguments that trace discriminates between:
* instance argument - the object bound to an instance method.
* class argument - the class object bound to a class method.
* positional arguments (named) - values bound to distinct names.
* positional arguments (default) - named positional arguments with
default values specified in the function declaration.
* positional arguments (anonymous) - an arbitrary number of values
that are all bound to the '*' variable.
* keyword arguments - zero or more name-value pairs that are
placed in a dictionary and bound to the double-star variable.
\var{name} - specifies the name of the formatter to be modified.
* instance argument - "self", "instance" or "this"
* class argument - "class"
* named argument - "named", "param" or "parameter"
* default argument - "default", "optional"
* anonymous argument - "anonymous", "arbitrary" or "unnamed"
* keyword argument - "keyword", "pair" or "pairs"
\var{func} - a function to format an argument.
* For all but anonymous formatters this function must accept two
arguments: the variable name and the value to which it is bound.
* The anonymous formatter function is passed only one argument
corresponding to an anonymous value.
* if \var{func} is "None" then the default formatter will be used.
"""
if name in ( 'self', 'instance', 'this' ):
global af_self
af_self = _formatter_self if func is None else func
elif name == 'class':
global af_class
af_class = _formatter_class if func is None else func
elif name in ( 'named', 'param', 'parameter' ):
global af_named
af_named = _formatter_named if func is None else func
elif name in ( 'default', 'optional' ):
global af_default
af_default = _formatter_defaults if func is None else func
elif name in ( 'anonymous', 'arbitrary', 'unnamed' ):
global af_anonymous
af_anonymous = chop if func is None else func
elif name in ( 'keyword', 'pair', 'pairs' ):
global af_keyword
af_keyword = _formatter_named if func is None else func
else:
raise ValueError('unknown trace formatter %r' % name)
######################################################################
# Decorator: trace
######################################################################
__builtins = (
'__import__(name,globals={},locals={},fromlist=[],level=-1)',
'abs(number)',
'all(iterable)',
'any(iterable)',
'apply(object,args=[],kwargs={})',
'bin(number)',
'callable(object)',
'chr(i)',
'cmp(x,y)',
'coerce(x,y)',
'compile(source,filename,mode,flags=0,dont_inherit=0)',
'delattr(object,name)',
'dir()',
'divmod(x,y)',
'eval(source,globals={},locals={})',
'execfile(filename,globals={},locals={})',
'filter(function,sequence)',
'format(value,format_spec="")',
'getattr(object,name)',
'globals()',
'hasattr(object,name)',
'hash(object)',
'hex(number)',
'id(object)',
'input(prompt=None)',
'intern(string)',
'isinstance(object,klass)',
'issubclass(C,B)',
'iter(collection)',
'len(object)',
'locals()',
'map(function,sequence)',
'max(iterable,key=None)',
'min(iterable,key=None)',
'next(iterator)',
'oct(number)',
'open(name,mode=0666,buffering=True)',
'ord(c)',
'pow(x,y,z=None)',
'range()',
'raw_input(prompt=None)',
'reduce(function,sequence)',
'reload(module)',
'repr(object)',
'round(number,ndigits=0)',
'setattr(object,name,value)',
'sorted(iterable,cmp=None,key=None,reverse=False)',
'sum(sequence,start=0)',
'unichr(i)',
'vars()',
'zip(sequence)'
)
__builtin_defaults = {
'""': "",
'-1': -1,
'0' : 0,
'0666': 0666,
'False': False,
'None': None,
'True': True,
'[]': list(),
'{}': dict()
}
__builtin_functions = None
def __lookup_builtin(name):
"""Lookup the parameter name and default parameter values for
builtin functions.
"""
global __builtin_functions
if __builtin_functions is None:
builtins = dict()
for proto in __builtins:
pos = proto.find('(')
name, params, defaults = proto[:pos], list(), dict()
for param in proto[pos+1:-1].split(','):
pos = param.find('=')
if not pos < 0:
param, value = param[:pos], param[pos+1:]
try:
defaults[param] = __builtin_defaults[value]
except KeyError:
raise ValueError( 'builtin function %s: parameter %s: unknown default %r' % ( name, param, value ) )
params.append(param)
builtins[name] = ( params, defaults )
__builtin_functions = builtins
try:
params, defaults = __builtin_functions[name]
except KeyError:
params, defaults = tuple(), dict()
__builtin_functions[name] = ( params, defaults )
print >>sys.stderr, "Warning: builtin function %r is missing prototype" % name
return ( len(params), params, defaults )
_ = ThreadLocal()
_.initialValue = False
def trace(_name):
"""Function decorator that logs function entry and exit details.
\var{_name} a string, an instance of logging.Logger or a function.
Construct a function or method proxy to generate call traces.
"""
def decorator(_func):
"""This is the actual decorator function that wraps the
\var{_func} function for detailed logging.
"""
def positional(name, value):
"""Format one named positional argument.
"""
if name is __self:
return af_self(name, value)
elif name is __klass:
return af_class(name, value)
else:
return af_named(name, value)
def wrapper(*__argv, **__kwds):
if not logger.isEnabledFor(logging.DEBUG) or _.value:
return _func(*__argv, **__kwds)
try:
_.value = True
params = dict(co_defaults)
params.update(__kwds)
params.update(zip(co_varnames, __argv))
position = [ positional(n, params.pop(n)) for n in co_varnames[:len(__argv)] ]
defaults = [ af_default(n, params.pop(n)) for n in co_varnames[len(__argv):] ]
nameless = ( af_unnamed(v) for v in __argv[co_argcount:] )
keywords = ( af_keyword(n, params[n]) for n in sorted(params.keys()) )
params = ', '.join(filter(None, chain(position, defaults, nameless, keywords)))
# params = params.replace(', [', '[, ').replace('][, ', ', ')
enter = [ pre_enter ]
if params:
enter.append(' ')
enter.append(params)
enter.append(' ')
enter.append(')')
leave = [ pre_leave ]
try:
logger.debug(''.join(enter))
try:
try:
_.value = False
result = _func(*__argv, **__kwds)
finally:
_.value = True
except:
type, value, traceback = sys.exc_info()
leave.append(' => exception thrown\n\traise ')
__mname = type.__module__
if __mname != '__main__':
leave.append(__mname)
leave.append('.')
leave.append(type.__name__)
if value.args:
leave.append('(')
leave.append(', '.join(chop(v) for v in value.args))
leave.append(')')
else:
leave.append('()')
raise
else:
if result is not None:
leave.append(' => ')
leave.append(chop(result))
finally:
logger.debug(''.join(leave))
finally:
_.value = False
return result
####
# decorator
####
__self = False
__klass = False
__rewrap = lambda x: x
if type(_func) in FunctionTypes:
# functions do not belong to a class.
__cname = None
elif type(_func) in MethodTypes:
# im_self is None for unbound instance methods.
# Assumption: trace is only called on unbound methods.
if _func.im_self is not None:
__rewrap = classmethod
__cname = _func.im_self.__name__
__klass = True
else:
__cname = _func.im_class.__name__
__self = True
_func = _func.im_func
else:
# other callables are not supported yet.
return _func
__module = _func.__module__
__fname = _func.__name__
# Do not wrap initialization and conversion methods.
if __fname in ('__init__', '__new__', '__repr__', '__str__'):
return __rewrap(_func)
# Generate the Fully Qualified Function Name.
__fqfn = list()
if __module != '__main__':
__fqfn.append(__module)
__fqfn.append('.')
if __cname is not None:
__fqfn.append(__cname)
__fqfn.append('.')
__fqcn = ''.join(__fqfn)
__fqfn.append(__fname)
__fqfn = ''.join(__fqfn)
if type(_name) in CallableTypes:
logger = getLoggerFactory().getLogger(__fqfn)
elif loggable(_name):
logger = _name
elif isinstance(_name, basestring):
logger = getLoggerFactory().getLogger(_name)
else:
raise ValueError('invalid object %r: must be a function, a method, a string or an object that implements the Logger API' % _name)
pre_enter = [ '>>> ' ]
pre_enter.append(__fqfn)
pre_enter.append('(')
pre_enter = ''.join(pre_enter)
pre_leave = [ '<<< ' ]
pre_leave.append(__fqfn)
pre_leave = ''.join(pre_leave)
####
# Here we are really mucking around in function internals.
# func_code is the low level 'code' instance that describes
# the function arguments, variable and other stuff.
#
# func.func_code.co_argcount - number of function arguments.
# func.func_code.co_varnames - function variables names, the
# first co_argcount values are the argument names.
# func.func_defaults - contains default arguments
try:
code = _func.func_code
except AttributeError:
co_argcount , \
co_varnames , \
co_defaults = __lookup_builtin(_func.__name__)
else:
co_argcount = code.co_argcount
co_varnames = code.co_varnames[:co_argcount]
if _func.func_defaults:
co_defaults = dict(zip(co_varnames[-len(_func.func_defaults):], _func.func_defaults))
else:
co_defaults = dict()
if __klass:
__klass = co_varnames[0]
if __self:
__self = co_varnames[0]
return __rewrap(wraps(_func)(wrapper))
####
# trace
####
logging.basicConfig(level = logging.DEBUG)
if type(_name) in CallableTypes:
return decorator(_name)
else:
return decorator
######################################################################
# attach: apply decorator to a class or module
######################################################################
def attachToProperty(decorator, klass, k, prop_attr, prop_decorator):
if prop_attr is not None:
setattr(klass, k, prop_attr)
value = decorator(getattr(klass, k))
else:
value = None
# Passing "None" to the property decorator causes the new property
# to assume the original value of the associated attribute.
return prop_decorator(value)
def attachToClass(decorator, klass, recursive = True):
for k, v in klass.__dict__.iteritems():
t = type(v)
if t is types.FunctionType or t is ClassMethodType:
setattr(klass, k, decorator(getattr(klass, k)))
elif t is StaticMethodType:
setattr(klass, k, staticmethod(decorator(getattr(klass, k))))
elif t is PropertyType:
value = getattr(klass, k)
value = attachToProperty(decorator, klass, k, value.fget, value.getter)
value = attachToProperty(decorator, klass, k, value.fset, value.setter)
value = attachToProperty(decorator, klass, k, value.fdel, value.deleter)
setattr(klass, k, value)
elif recursive and inspect.isclass(v):
attachToClass(decorator, v, recursive)
def attach(decorator, obj, recursive = True):
"""attach(decorator, class_or_module[, recursive = True])
Utility to attach a \val{decorator} to the \val{obj} instance.
If \val{obj} is a module, the decorator will be attached to every
function and class in the module.
If \val{obj} is a class, the decorator will be attached to every
method and subclass of the class.
if \val{recursive} is "True" then subclasses will be decorated.
"""
if inspect.ismodule(obj):
for name, fn in inspect.getmembers(obj, inspect.isfunction):
setattr(obj, name, decorator(fn))
for name, klass in inspect.getmembers(obj, inspect.isclass):
attachToClass(decorator, klass, recursive)
elif inspect.isclass(obj):
attachToClass(decorator, obj, recursive)
######################################################################
# class TraceMetaClass
######################################################################
class TraceMetaClass(type):
"""Metaclass to automatically attach the 'trace' decorator to all
methods, static method and class methods of the class.
"""
def __new__(meta, className, bases, classDict):
klass = super(TraceMetaClass, meta).__new__(meta, className, bases, classDict)
if classDict.has_key('__logger__'):
hook = trace(classDict['__logger__'])
else:
hook = trace
attachToClass(hook, klass, False)
return klass
if __name__ == '__main__':
logging.basicConfig(level = logging.DEBUG)
logger = logging.root
class Test(object):
__logger__ = logger
__metaclass__ = TraceMetaClass
@classmethod
def classMethod(klass):
pass
@staticmethod
def staticMethod():
pass
__test = None
@property
def test(self):
return self.__test
@test.setter
def test(self, value):
self.__test = value
def method(self):
pass
Test.classMethod()
Test.staticMethod()
test = Test()
test.test = 1
assert 1 == test.test
test.method()
class Test(object):
@classmethod
def classMethod(klass):
pass
@staticmethod
def staticMethod():
pass
__test = None
@property
def test(self):
return self.__test
@test.setter
def test(self, value):
self.__test = value
def method(self):
pass
def __str__(self):
return 'Test(' + str(self.test) + ')'
attach(trace(logger), Test)
Test.classMethod()
Test.staticMethod()
test = Test()
test.test = 1
assert 1 == test.test
test.method()
print str(test)
@trace(logger)
def test(x, y, z = True):
a = x + y
b = x * y
if z: return a
else: return b
test(5, 5)
test(5, 5, False)
setLoggerFactory(PrependLoggerFactory())
@trace('main')
def test(x, *argv, **kwds):
"""Simple test
"""
return x + sum(argv)
test(5)
test(5, 5)
test(5, 5, False)
test(5, 5, False, name = 10)
test( *xrange(50) )
assert test.__doc__ == 'Simple test\n '
assert test.__name__ == 'test'
myzip = trace('main')(zip)
for i, j in myzip(xrange(5), xrange(5, 10)):
print i, j