Welcome, guest | Sign In | My Account | Store | Cart

This module implements a function guard - facility to redirect the the call to one of several function implementations at run time based on the actual call arguments.

Wrap each of the identically named functions in a @guard decorator and provide a _when parameter with a default value set to guarding expression.

See samples at the top of the module.

Python, 379 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python3
#-*- coding: iso-8859-1 -*-
################################################################################
#
# Function guards for Python 3.
#
# (c) 2016, Dmitry Dvoinikov <dmitry@targeted.org>
# Distributed under MIT license.
#
# Samples:
#
# from funcguard import guard
#
# @guard
# def abs(a, _when = "a >= 0"):
#     return a
#
# @guard
# def abs(a, _when = "a < 0"):
#     return -a
#
# assert abs(1) == abs(-1) == 1
#
# @guard
# def factorial(n): # no _when expression => default
#    return 1
#
# @guard
# def factorial(n, _when = "n > 1"):
#    return n * factorial(n - 1)
#
# assert factorial(10) == 3628800
#
# class TypeTeller:
#     @staticmethod
#     @guard
#     def typeof(value, _when = "isinstance(value, int)"):
#         return int
#     @staticmethod
#     @guard
#     def typeof(value, _when = "isinstance(value, str)"):
#         return str
#
# assert TypeTeller.typeof(0) is int
# TypeTeller.typeof(0.0) # throws
#
# class AllowedProcessor:
#     def __init__(self, allowed):
#         self._allowed = allowed
#     @guard
#     def process(self, value, _when = "value in self._allowed"):
#         return "ok"
#     @guard
#     def process(self, value): # no _when expression => default
#         return "fail"
#
# ap = AllowedProcessor({1, 2, 3})
# assert ap.process(1) == "ok"
# assert ap.process(0) == "fail"
#
# guard.default_eval_args( # values to insert to all guards scopes
#     office_hours = lambda: 9 <= datetime.now().hour < 18)
#
# @guard
# def at_work(*args, _when = "office_hours()", **kwargs):
#     print("welcome")
#
# @guard
# def at_work(*args, **kwargs):
#     print("come back tomorrow")
#
# at_work() # either "welcome" or "come back tomorrow"
#
# The complete source code with self-tests is available from:
# https://github.com/targeted/funcguard
#
################################################################################

__all__ = [ "guard", "GuardException", "IncompatibleFunctionsException",
            "FunctionArgumentsMatchException", "GuardExpressionException",
            "DuplicateDefaultGuardException", "GuardEvalException",
            "NoMatchingFunctionException" ]

################################################################################

import inspect; from inspect import getfullargspec
import functools; from functools import wraps
import sys; from sys import modules
try:
    (lambda: None).__qualname__
except AttributeError:
    import qualname; from qualname import qualname # prior to Python 3.3 workaround
else:
    qualname = lambda f: f.__qualname__

################################################################################

class GuardException(Exception): pass
class IncompatibleFunctionsException(GuardException): pass
class FunctionArgumentsMatchException(GuardException): pass
class GuardExpressionException(GuardException): pass
class DuplicateDefaultGuardException(GuardException): pass
class GuardEvalException(GuardException): pass
class NoMatchingFunctionException(GuardException): pass

################################################################################
# takes an argument specification for a function and a set of actual call
# positional and keyword arguments, returns a flat namespace-like dict
# mapping parameter names to their actual values

def _eval_args(argspec, args, kwargs):

    # match positional arguments

    matched_args = {}
    expected_args = argspec.args
    default_args = argspec.defaults or ()

    _many = lambda t: "argument" + ("s" if len(t) != 1 else "")

    # copy provided args to expected, append defaults if necessary

    for i, name in enumerate(expected_args):
        if i < len(args):
            value = args[i]
        elif i >= len(expected_args) - len(default_args):
            value = argspec.defaults[i - len(expected_args) + len(default_args)]
        else:
            missing_args = expected_args[len(args):len(expected_args) - len(default_args)]
            raise FunctionArgumentsMatchException("missing required positional {0:s}: {1:s}".\
                      format(_many(missing_args), ", ".join(missing_args)))
        matched_args[name] = value

    # put extra provided args to *args if the function allows

    if argspec.varargs:
        matched_args[argspec.varargs] = args[len(expected_args):] if len(args) > len(expected_args) else ()
    elif len(args) > len(expected_args):
        raise FunctionArgumentsMatchException(
                  "takes {0:d} positional {1:s} but {2:d} {3:s} given".
                  format(len(expected_args), _many(expected_args),
                         len(args), len(args) == 1 and "was" or "were"))

    # match keyword arguments

    matched_kwargs = {}
    expected_kwargs = argspec.kwonlyargs
    default_kwargs = argspec.kwonlydefaults or {}

    # extract expected kwargs from provided, using defaults if necessary

    missing_kwargs = []
    for name in expected_kwargs:
        if name in kwargs:
            matched_kwargs[name] = kwargs[name]
        elif name in default_kwargs:
            matched_kwargs[name] = default_kwargs[name]
        else:
            missing_kwargs.append(name)
    if missing_kwargs:
        raise FunctionArgumentsMatchException("missing required keyword {0:s}: {1:s}".\
                  format(_many(missing_kwargs), ", ".join(missing_kwargs)))

    extra_kwarg_names = [ name for name in kwargs if name not in matched_kwargs ]
    if argspec.varkw:
        if extra_kwarg_names:
            extra_kwargs = { name: kwargs[name] for name in extra_kwarg_names }
        else:
            extra_kwargs = {}
        matched_args[argspec.varkw] = extra_kwargs
    elif extra_kwarg_names:
        raise FunctionArgumentsMatchException("got unexpected keyword {0:s}: {1:s}".\
                  format(_many(extra_kwarg_names), ", ".join(extra_kwarg_names)))

    # both positional and keyword argument are returned in the same scope-like dict

    for name, value in matched_kwargs.items():
        matched_args[name] = value

    return matched_args

################################################################################
# takes an argument specification for a function, from it extracts and returns
# a compiled expression which is to be matched against call arguments

def _get_guard_expr(func_name, argspec):

    guard_expr_text = None

    if "_when" in argspec.args:
        defaults = argspec.defaults or ()
        i = argspec.args.index("_when")
        if i >= len(argspec.args) - len(defaults):
            guard_expr_text = defaults[i - len(argspec.args) + len(defaults)]
    elif "_when" in argspec.kwonlyargs:
        guard_expr_text = (argspec.kwonlydefaults or {}).get("_when")
    else:
        return None # indicates default guard

    if guard_expr_text is None:
        raise GuardExpressionException("guarded function {0:s}() requires a \"_when\" "
                                       "argument with guard expression text as its "
                                       "default value".format(func_name))
    try:
        guard_expr = compile(guard_expr_text, func_name, "eval")
    except Exception as e:
        error = str(e)
    else:
        error = None
    if error is not None:
        raise GuardExpressionException("invalid guard expression for {0:s}(): "
                                       "{1:s}".format(func_name, error))

    return guard_expr

################################################################################
# checks whether two functions' argspecs are compatible to be guarded as one,
# compatible argspecs have identical positional and keyword parameters except
# for "_when" and annotations

def _compatible_argspecs(argspec1, argspec2):
    return _stripped_argspec(argspec1) == _stripped_argspec(argspec2)

def _stripped_argspec(argspec):

    args = argspec.args[:]
    defaults = list(argspec.defaults or ())
    kwonlyargs = argspec.kwonlyargs[:]
    kwonlydefaults = (argspec.kwonlydefaults or {}).copy()

    if "_when" in args:
        i = args.index("_when")
        if i >= len(args) - len(defaults):
            del defaults[i - len(args) + len(defaults)]
            del args[i]
    elif "_when" in kwonlyargs and "_when" in kwonlydefaults:
        i = kwonlyargs.index("_when")
        del kwonlyargs[i]
        del kwonlydefaults["_when"]

    return (args, defaults, kwonlyargs, kwonlydefaults, argspec.varargs, argspec.varkw)

################################################################################

def guard(func, module = None): # the main decorator function

    # see if it is a function of a lambda

    try:
        eval(func.__name__)
    except SyntaxError:
        return func # <lambda> => not guarded
    except NameError:
        pass # valid name

    # get to the bottom of a possible decorator chain
    # to get the original function's specification

    original_func = func
    while hasattr(original_func, "__wrapped__"):
        original_func = original_func.__wrapped__

    func_name = qualname(original_func)
    func_module = module or modules[func.__module__] # module serves only as a place to keep state
    argspec = getfullargspec(original_func)

    # the registry of known guarded function is attached to the module containg them

    guarded_functions = getattr(func_module, "__guarded_functions__", None)
    if guarded_functions is None:
        guarded_functions = func_module.__guarded_functions__ = {}

    original_argspec, first_guard, last_guard = guard_info = \
        guarded_functions.setdefault(func_name, [argspec, None, None])

    # all the guarded functions with the same name must have identical signature

    if argspec is not original_argspec and not _compatible_argspecs(argspec, original_argspec):
        raise IncompatibleFunctionsException("function signature is incompatible "
                    "with the previosly registered {0:s}()".format(func_name))

    @wraps(func)
    def func_guard(*args, **kwargs): # the call proxy function

        # since all versions of the function have essentially identical signatures,
        # their mapping to the actually provided arguments can be calculated once
        # for each call and not against every version of the function

        try:
            eval_args = _eval_args(argspec, args, kwargs)
        except FunctionArgumentsMatchException as e:
            error = str(e)
        else:
            error = None
        if error is not None:
            raise FunctionArgumentsMatchException("{0:s}() {1:s}".format(func_name, error))

        for name, value in guard.__default_eval_args__.items():
            eval_args.setdefault(name, value)

        # walk the chain of function versions starting with the first, looking
        # for the one for which the guard expression evaluates to truth

        current_guard = func_guard.__first_guard__
        while current_guard:
            try:
                if not current_guard.__guard_expr__ or \
                   eval(current_guard.__guard_expr__, globals(), eval_args):
                    break
            except Exception as e:
                error = str(e)
            else:
                error = None
            if error is not None:
                raise GuardEvalException("guard expression evaluation failed for "
                                         "{0:s}(): {1:s}".format(func_name, error))
            current_guard = current_guard.__next_guard__
        else:
            raise NoMatchingFunctionException("none of the guard expressions for {0:s}() "
                                              "matched the call arguments".format(func_name))

        return current_guard.__wrapped__(*args, **kwargs) # call the winning function version

    # in different version of Python @wraps behaves differently with regards
    # to __wrapped__, therefore we set it the way we need it here

    func_guard.__wrapped__ = func

    # the guard expression is attached

    func_guard.__guard_expr__ = _get_guard_expr(func_name, argspec)

    # maintain a linked list for all versions of the function

    if last_guard and not last_guard.__guard_expr__: # the list is not empty and the
                                                     # last guard is already a default
        if not func_guard.__guard_expr__:
            raise DuplicateDefaultGuardException("the default version of {0:s}() has already "
                                                 "been specified".format(func_name))

        # the new guard has to be inserted one before the last

        if first_guard is last_guard: # the list contains just one guard

            # new becomes first, last is not changed

            first_guard.__first_guard__ = func_guard.__first_guard__ = func_guard
            func_guard.__next_guard__ = first_guard
            first_guard = guard_info[1] = func_guard

        else: # the list contains more than one guard

            # neither first nor last are changed

            prev_guard = first_guard
            while prev_guard.__next_guard__ is not last_guard:
                prev_guard = prev_guard.__next_guard__

            func_guard.__first_guard__ = first_guard
            func_guard.__next_guard__ = last_guard
            prev_guard.__next_guard__ = func_guard

    else: # the new guard is inserted last

        if not first_guard:
            first_guard = guard_info[1] = func_guard
        func_guard.__first_guard__ = first_guard
        func_guard.__next_guard__ = None
        if last_guard:
            last_guard.__next_guard__ = func_guard
        last_guard = guard_info[2] = func_guard

    return func_guard

guard.__default_eval_args__ = {}
guard.default_eval_args = lambda *args, **kwargs: guard.__default_eval_args__.update(*args, **kwargs)

################################################################################
# EOF