Source code for Goulib.decorators

#!/usr/bin/env python
# coding: utf8
useful decorators
__author__ = "Philippe Guglielmetti"
__copyright__ = "Copyright 2015, Philippe Guglielmetti"
__credits__ = [""]
__license__ = "LGPL + MIT"

import functools, sys, logging

_gettrace=getattr(sys, 'gettrace', None)
debugger = _gettrace and _gettrace()'debugger '+('ACTIVE' if debugger else 'INACTIVE'))

[docs]def memoize(obj): """speed up repeated calls to a function by caching its results in a dict index by params :see: """ cache = obj.cache = {} @functools.wraps(obj) def memoizer(*args, **kwargs): key = str(args) + str(kwargs) if key not in cache: cache[key] = obj(*args, **kwargs) return cache[key] return memoizer
import logging
[docs]def debug(func): # Customize these messages ENTRY_MESSAGE = 'Entering {}' EXIT_MESSAGE = 'Exiting {}' @functools.wraps(func) def wrapper(*args, **kwds): logger=logging.getLogger() level=logger.getEffectiveLevel() logger.setLevel(logging.DEBUG) f_result = func(*args, **kwds) logger.setLevel(level) return f_result return wrapper
[docs]def nodebug(func): @functools.wraps(func) def wrapper(*args, **kwds): logger=logging.getLogger() level=logger.getEffectiveLevel() logger.setLevel(logging.INFO) f_result = func(*args, **kwds) logger.setLevel(level) return f_result return wrapper
# # BUT read import multiprocessing from multiprocessing.pool import ThreadPool import six.moves._thread as thread import threading import weakref thread_pool = None
[docs]def get_thread_pool(): global thread_pool if thread_pool is None: # fix for python <2.7.2 if not hasattr(threading.current_thread(), "_children"): threading.current_thread()._children = weakref.WeakKeyDictionary() thread_pool = ThreadPool(processes=1) return thread_pool
[docs]def timeout(timeout): def wrap_function(func): if not timeout: return func @functools.wraps(func) def __wrapper(*args, **kwargs): try: async_result = get_thread_pool().apply_async(func, args=args, kwds=kwargs) return async_result.get(timeout) except thread.error: return func(*args, **kwargs) return __wrapper return wrap_function
# from threading import Timer from multiprocessing import TimeoutError
[docs]def itimeout(iterable,timeout): """timeout for loops :param iterable: any iterable :param timeout: float max running time in seconds :yield: items in iterator until timeout occurs :raise: multiprocessing.TimeoutError if timeout occured """ if False : # handle debugger better one day ... n=100*timeout for i,x in enumerate(iterable): yield x if i>n : break else: timer=Timer(timeout,lambda:None) timer.start() for x in iterable: yield x if timer.finished.is_set(): raise TimeoutError timer.cancel() #don't forget it, otherwise the thread never finishes...
# registry = {}
[docs]class MultiMethod(object):
[docs] def __init__(self, name): = name self.typemap = {}
[docs] def __call__(self, *args): types = tuple(arg.__class__ for arg in args) # a generator expression! function = self.typemap.get(types) if function is None: raise TypeError("no match") return function(*args)
[docs] def register(self, types, function): if types in self.typemap: raise TypeError("duplicate registration") self.typemap[types] = function
[docs]def multimethod(*types): """ allows to overload functions for various parameter types @multimethod(int, int) def foo(a, b): ...code for two ints... @multimethod(float, float): def foo(a, b): ...code for two floats... @multimethod(str, str): def foo(a, b): ...code for two strings... """ def register(function): name = function.__name__ mm = registry.get(name) if mm is None: mm = registry[name] = MultiMethod(name) mm.register(types, function) return mm return register