Welcome, guest | Sign In | My Account | Store | Cart
#! /usr/bin/env python
######################################################################
#  Copyright (c) by Kevin L. Sitze on 2011-01-23.
#  This code may be used pursuant to the MIT License.
######################################################################

"""This package provides facilities to attach decorators to classes or
modules (possibly recursively).  A tracing decorator is provided for
tracing function and method calls in your applications.

from decorators import *

@trace
def function(...):
    '''Output goes to <TT>logging.getLogger(__module__)</TT>,
    where <TT>__module__</TT> is the name of the module in which
    the function is defined.  If the current module is '__main__'
    then the root logger will be used.
    '''
    pass

@trace(logger)
def function(...):
    '''Output goes to the specified logger instance.
    '''
    pass

@trace("Logger")
def function(...):
    '''Output goes to the named logger instance.
    '''

class ClassName(object):
    '''Output goes to the ClassName logger.  If ClassName is
    not in the '__main__' module then the name of its module
    will be prefixed to the classname (separated by a dot).
    All methods on the class will be traced.
    '''
    __logger__ = loggerOrLoggerName # optional
    __metaclass__ = decorators.TraceMetaClass
    def method(...):
        pass

class PostDecorate(object):
    def method(...):
        pass
# attach a decorator to an existing class.
attach(trace(logger), PostDecorate)

You can also attach a decorator to an existing module.
"""

import inspect
import logging
import sys
import thread
import types

from functools import wraps
from itertools import chain

FunctionTypes = tuple(set((
    types.BuiltinFunctionType,
    types.FunctionType
)))

MethodTypes = tuple(set((
    types.BuiltinMethodType,
    types.MethodType,
    types.UnboundMethodType
)))

class _C(object):
    @classmethod
    def classMethod(klass): pass
    classMethodType = type(classMethod)

    @staticmethod
    def staticMethod(): pass
    staticMethodType = type(staticMethod)

    @property
    def propertyMethod(self): pass

ClassMethodType = _C.classMethodType
StaticMethodType = _C.staticMethodType
PropertyType = type(_C.propertyMethod)

CallableTypes = tuple(set((
    types.BuiltinFunctionType,
    types.FunctionType,
    types.BuiltinMethodType,
    types.MethodType,
    types.UnboundMethodType,
    ClassMethodType
)))

__all__ = (
    'ThreadLocal',
    'TraceMetaClass',
    'attach',
    'getLoggerFactory',
    'setLoggerFactory',
    'trace'
)

_logger_factory = logging
def getLoggerFactory():
    """Retrieve the current factory object for creating loggers.
    The default is to use the logging module.
    """
    global _logger_factory
    return _logger_factory

def setLoggerFactory(factory):
    """Set a factory object for creating loggers.  This object must
    publish a method or class named 'getLogger' that takes a string
    parameter naming the logger instance to retrieve.  Logger objects
    returned by this factory must, at a minimum, expose the methods
    'isEnabledFor' and 'debug'.
    """
    global _logger_factory
    _logger_factory = factory

######################################################################
#  class PrependLoggerFactory
######################################################################

class PrependLoggerFactory(object):
    """This is a convenience class for creating new loggers for the
    "trace" decorator.  All loggers created via this class will have a
    user specified prefix prepended to the name of the logger to
    instantiate.
    """
    def __init__(self, prefix = 'trace'):
        """Construct a new "PrependLoggerFactory" instance that
        prepends the value \var{prefix} to the name of each
        logger to be created by this class.
        """
        self.__prefix = prefix.strip('.')

    @property
    def prefix(self):
        """The value to prefix to each logger created by this factory.
        """
        return self.__prefix
    @prefix.setter
    def prefix(self, value):
        self.__prefix = value.strip('.')

    def getLogger(self, name):
        return logging.getLogger('.'.join((self.__prefix, name)))

######################################################################
#  class ThreadLocal
######################################################################

class ThreadLocal(object):
    """Instances of this class provide a thread-local variable.
    """
    def __init__(self):
        self.__lock = thread.allocate_lock()
        self.__vars = dict()
        self.__init = None

    @property
    def value(self):
        with self.__lock:
            try:
                return self.__vars[thread.get_ident()]
            except KeyError:
                return self.__init
    @value.setter
    def value(self, value):
        with self.__lock:
            self.__vars[thread.get_ident()] = value

    @property
    def initialValue(self):
        with self.__lock:
            return self.__init
    @initialValue.setter
    def initialValue(self, value):
        with self.__lock:
            self.__init = value

######################################################################
#  Decorator: trace
######################################################################

__builtins = (
    '__import__(name,globals={},locals={},fromlist=[],level=-1)',
    'abs(number)',
    'all(iterable)',
    'any(iterable)',
    'apply(object,args=[],kwargs={})',
    'bin(number)',
    'callable(object)',
    'chr(i)',
    'cmp(x,y)',
    'coerce(x,y)',
    'compile(source,filename,mode,flags=0,dont_inherit=0)',
    'delattr(object,name)',
    'dir()',
    'divmod(x,y)',
    'eval(source,globals={},locals={})',
    'execfile(filename,globals={},locals={})',
    'filter(function,sequence)',
    'format(value,format_spec="")',
    'getattr(object,name)',
    'globals()',
    'hasattr(object,name)',
    'hash(object)',
    'hex(number)',
    'id(object)',
    'input(prompt=None)',
    'intern(string)',
    'isinstance(object,klass)',
    'issubclass(C,B)',
    'iter(collection)',
    'len(object)',
    'locals()',
    'map(function,sequence)',
    'max(iterable,key=None)',
    'min(iterable,key=None)',
    'next(iterator)',
    'oct(number)',
    'open(name,mode=0666,buffering=True)',
    'ord(c)',
    'pow(x,y,z=None)',
    'range()',
    'raw_input(prompt=None)',
    'reduce(function,sequence)',
    'reload(module)',
    'repr(object)',
    'round(number,ndigits=0)',
    'setattr(object,name,value)',
    'sorted(iterable,cmp=None,key=None,reverse=False)',
    'sum(sequence,start=0)',
    'unichr(i)',
    'vars()',
    'zip(sequence)'
)

__builtin_defaults = {
    '""': "",
    '-1': -1,
    '0' : 0,
    '0666': 0666,
    'False': False,
    'None': None,
    'True': True,
    '[]': list(),
    '{}': dict()
}

__builtin_functions = None
def __lookup_builtin(name):
    """Lookup the parameter name and default parameter values for
    builtin functions.
    """
    global __builtin_functions
    if __builtin_functions is None:
        builtins = dict()
        for proto in __builtins:
            pos = proto.find('(')
            name, params, defaults = proto[:pos], list(), dict()
            for param in proto[pos+1:-1].split(','):
                pos = param.find('=')
                if not pos < 0:
                    param, value = param[:pos], param[pos+1:]
                    try:
                        defaults[param] = __builtin_defaults[value]
                    except KeyError:
                        raise ValueError( 'builtin function %s: parameter %s: unknown default %r' % ( name, param, value ) )
                params.append(param)
            builtins[name] = ( params, defaults )
        __builtin_functions = builtins

    try:
        params, defaults = __builtin_functions[name]
    except KeyError:
        params, defaults = tuple(), dict()
        __builtin_functions[name] = ( params, defaults )
        print >>sys.stderr, "Warning: builtin function %r is missing prototype" % name
    return ( len(params), params, defaults )

MAX_SIZE = 320
def chop(value):
    s = repr(value)
    if len(s) > MAX_SIZE:
        return s[:MAX_SIZE] + '...' + s[-1]
    else:
        return s

def loggable(obj):
    """Return "True" if the obj implements the minimum Logger API
    required by the 'trace' decorator.
    """
    if isinstance(obj, logging.getLoggerClass()):
        return True
    else:
        return ( inspect.isclass(obj) and
                 inspect.ismethod(getattr(obj, 'debug', None)) and
                 inspect.ismethod(getattr(obj, 'isEnabledFor', None)) and
                 inspect.ismethod(getattr(obj, 'setLevel', None)) )

_ = ThreadLocal()
_.initialValue = False

def trace(_name):
    """Function decorator that logs function entry and exit details.

    \var{_name} a string, an instance of logging.Logger or a function.

    Construct a function or method proxy to generate call traces.
    """
    def decorator(_func):
        """This is the actual decorator function that wraps the
        \var{_func} function for detailed logging.
        """

        def format_key_val(key, val):
            """Format the (key, val) tuple in \var{entry}.
            """
            if key is __self:
                __mname = val.__module__
                if __mname != '__main__':
                    return '%s = <%s.%s object at 0x%x>' % (key, __mname, val.__class__.__name__, id(val))
                else:
                    return '%s = <%s object at 0x%x>' % (key, val.__class__.__name__, id(val))
            elif key is __klass:
                __mname = val.__module__
                if __mname != '__main__':
                    return "%s = <type '%s.%s'>" % (key, __mname, val.__name__)
                else:
                    return "%s = <type '%s'>" % (key, val.__name__)
            else:
                return '%s = %s' % (key, chop(val))

        def wrapper(*__argv, **__kwds):
            if not logger.isEnabledFor(logging.DEBUG) or _.value:
                return _func(*__argv, **__kwds)

            try:
                _.value = True
                params = dict(co_defaults)
                params.update((k, format_key_val(k, v)) for k, v in __kwds.iteritems())
                params.update((k, format_key_val(k, v)) for k, v in zip(co_varnames, __argv))

                position = [ params.pop(x) for x in co_varnames ]
                nameless = (chop(x) for x in __argv[co_argcount:])
                keywords = params.itervalues()

                params = ', '.join(chain(position, nameless, keywords))
                # params = params.replace(', [', '[, ').replace('][, ', ', ')

                enter = [ pre_enter ]
                if params:
                    enter.append(' ')
                    enter.append(params)
                    enter.append(' ')
                enter.append(')')

                leave = [ pre_leave ]

                try:
                    logger.debug(''.join(enter))
                    try:
                        try:
                            _.value = False
                            result = _func(*__argv, **__kwds)
                        finally:
                            _.value = True
                    except:
                        type, value, traceback = sys.exc_info()
                        leave.append(' => exception thrown\n\traise ')
                        __mname = type.__module__
                        if __mname != '__main__':
                            leave.append(__mname)
                            leave.append('.')
                        leave.append(type.__name__)
                        if value.args:
                            leave.append('(')
                            leave.append(', '.join(chop(v) for v in value.args))
                            leave.append(')')
                        else:
                            leave.append('()')
                        raise
                    else:
                        if result is not None:
                            leave.append(' => ')
                            leave.append(chop(result))
                finally:
                    logger.debug(''.join(leave))
            finally:
                _.value = False

            return result

        ####
        #  decorator
        ####

        __self  = False
        __klass = False
        __rewrap = lambda x: x
        if type(_func) in FunctionTypes:
            # functions do not belong to a class.
            __cname = None
        elif type(_func) in MethodTypes:
            # im_self is None for unbound instance methods.
            # Assumption: trace is only called on unbound methods.
            if _func.im_self is not None:
                __rewrap = classmethod
                __cname = _func.im_self.__name__
                __klass = True
            else:
                __cname = _func.im_class.__name__
                __self  = True
            _func = _func.im_func
        else:
            # other callables are not supported yet.
            return _func
        __module = _func.__module__
        __fname  = _func.__name__

        # Do not wrap initialization and conversion methods.
        if __fname in ('__init__', '__new__', '__repr__', '__str__'):
            return __rewrap(_func)

        # Generate the Fully Qualified Function Name.

        __fqfn = list()
        if __module != '__main__':
            __fqfn.append(__module)
            __fqfn.append('.')
        if __cname is not None:
            __fqfn.append(__cname)
            __fqfn.append('.')
        __fqcn = ''.join(__fqfn)
        __fqfn.append(__fname)
        __fqfn = ''.join(__fqfn)

        if type(_name) in CallableTypes:
            logger = getLoggerFactory().getLogger(__fqfn)
        elif loggable(_name):
            logger = _name
        elif isinstance(_name, basestring):
            logger = getLoggerFactory().getLogger(_name)
        else:
            raise ValueError('invalid object %r: must be a function, a method, a string or an object that implements the Logger API' % _name)

        pre_enter = [ '>>> ' ]
        pre_enter.append(__fqfn)
        pre_enter.append('(')
        pre_enter = ''.join(pre_enter)

        pre_leave = [ '<<< ' ]
        pre_leave.append(__fqfn)
        pre_leave = ''.join(pre_leave)

        ####
        #  Here we are really mucking around in function internals.
        #  func_code is the low level 'code' instance that describes
        #  the function arguments, variable and other stuff.
        #
        #  func.func_code.co_argcount - number of function arguments.
        #  func.func_code.co_varnames - function variables names, the
        #      first co_argcount values are the argument names.
        #  func.func_defaults - contains default arguments

        try:
            code = _func.func_code
        except AttributeError:
            co_argcount , \
            co_varnames , \
            co_defaults = __lookup_builtin(_func.__name__)
        else:
            co_argcount = code.co_argcount
            co_varnames = code.co_varnames[:co_argcount]
            if _func.func_defaults:
                co_defaults = dict(zip(co_varnames[-len(_func.func_defaults):], _func.func_defaults))
            else:
                co_defaults = dict()
            if __klass:
                __klass = co_varnames[0]
            if __self:
                __self  = co_varnames[0]
        if co_defaults:
            co_defaults = dict((n, '[%s = %s]' % (n, chop(v))) for n, v in co_defaults.iteritems())
        return __rewrap(wraps(_func)(wrapper))

    ####
    #  trace
    ####

    logging.basicConfig(level = logging.DEBUG)
    if type(_name) in CallableTypes:
        return decorator(_name)
    else:
        return decorator

######################################################################
#  attach: apply decorator to a class or module
######################################################################

def attachToClass(decorator, klass, recursive = True):
    for k, v in klass.__dict__.iteritems():
        t = type(v)
        if t is types.FunctionType or t is ClassMethodType:
            setattr(klass, k, decorator(getattr(klass, k)))
        elif t is StaticMethodType:
            setattr(klass, k, staticmethod(decorator(getattr(klass, k))))
        elif t is PropertyType:
            value = getattr(klass, k)
            if value.fget is not None:
                setattr(klass, k, value.fget)
                value = value.getter(decorator(getattr(klass, k)))
            if value.fset is not None:
                setattr(klass, k, value.fset)
                value = value.setter(decorator(getattr(klass, k)))
            if value.fdel is not None:
                setattr(klass, k, value.fdel)
                value = value.deleter(decorator(getattr(klass, k)))
            setattr(klass, k, value)
        elif recursive and inspect.isclass(v):
            attachToClass(decorator, v, recursive)

def attach(decorator, obj, recursive = True):
    """attach(decorator, class_or_module[, recursive = True])

    Utility to attach a \val{decorator} to the \val{obj} instance.

    If \val{obj} is a module, the decorator will be attached to every
    function and class in the module.

    If \val{obj} is a class, the decorator will be attached to every
    method and subclass of the class.

    if \val{recursive} is "True" then subclasses will be decorated.
    """
    if inspect.ismodule(obj):
        for name, fn in inspect.getmembers(obj, inspect.isfunction):
            setattr(obj, name, decorator(fn))
        for name, klass in inspect.getmembers(obj, inspect.isclass):
            attachToClass(decorator, klass, recursive)
    elif inspect.isclass(obj):
        attachToClass(decorator, obj, recursive)

######################################################################
#  class TraceMetaClass
######################################################################

class TraceMetaClass(type):
    """Metaclass to automatically attach the 'trace' decorator to all
    methods, static method and class methods of the class.
    """
    def __new__(meta, className, bases, classDict):
        klass = super(TraceMetaClass, meta).__new__(meta, className, bases, classDict)
        if classDict.has_key('__logger__'):
            hook = trace(classDict['__logger__'])
        else:
            hook = trace
        attachToClass(hook, klass, False)
        return klass

if __name__ == '__main__':
    logging.basicConfig(level = logging.DEBUG)
    logger = logging.root

    class Test(object):
        __logger__ = logger
        __metaclass__ = TraceMetaClass

        @classmethod
        def classMethod(klass):
            pass

        @staticmethod
        def staticMethod():
            pass

        __test = None
        @property
        def test(self):
            return self.__test
        @test.setter
        def test(self, value):
            self.__test = value

        def method(self):
            pass

    Test.classMethod()
    Test.staticMethod()
    test = Test()
    test.test = 1
    assert 1 == test.test
    test.method()

    class Test(object):
        @classmethod
        def classMethod(klass):
            pass

        @staticmethod
        def staticMethod():
            pass

        __test = None
        @property
        def test(self):
            return self.__test
        @test.setter
        def test(self, value):
            self.__test = value

        def method(self):
            pass

        def __str__(self):
            return 'Test(' + str(self.test) + ')'

    attach(trace(logger), Test)
    Test.classMethod()
    Test.staticMethod()
    test = Test()
    test.test = 1
    assert 1 == test.test
    test.method()
    print str(test)

    @trace(logger)
    def test(x, y, z = True):
        a = x + y
        b = x * y
        if z: return a
        else: return b

    test(5, 5)
    test(5, 5, False)

    setLoggerFactory(PrependLoggerFactory())

    @trace('main')
    def test(x, *argv, **kwds):
        """Simple test
        """
        return x + sum(argv)

    test(5)
    test(5, 5)
    test(5, 5, False)
    test(5, 5, False, name = 10)

    test( *xrange(50) )

    assert test.__doc__ == 'Simple test\n        '
    assert test.__name__ == 'test'

    myzip = trace('main')(zip)
    for i, j in myzip(xrange(5), xrange(5, 10)):
        print i, j

History