Welcome, guest | Sign In | My Account | Store | Cart
from threading import Thread, Event

class future:
    """
    Without thinking in thread creation the idea is to call several times 
    a function assigning a thread for each call with related parameters 
    and returning the list of results in a pretty pythonic way.
    For example, if we have already defined the function 'func':
    res = func(par)
    we want to call that several times and get the values in the future. So: 
    func(par1)
    func(par2)
    func(par3).get()
    func(par4).get_all()
    
    assigning a thread for each call. The 'get' method returns the first possible results,
    and 'get_all' returns all the values in a list. The decorator works fine with kwargs too.
    
    This recipe is based on:
    http://code.activestate.com/recipes/355651-implementing-futures-with-decorators/
    that use only one call for the function. The problem in that recipe is that each call blocks the execution.
    """
    def __init__(self, f):
        self.__f = f
        self.__init_values__()
    
    def __init_values__(self):
        self.__thread = []
        self.__vals = []
        self.__exception = None
        self.__curr_index = 0
        self.__event = Event()
        
    def __call__(self, *args, **kwargs):
        t = Thread(target=self.runfunc, args=args, kwargs=kwargs)
        t.start()
        self.__thread.append(t)
        return self
    
    def runfunc(self, *args, **kw):
        try:
            self.__vals.append(self.__f(*args, **kw))
            self.__event.set()
        except Exception, e:
            self.__exception = e

    def get(self):
        """
        Returns the first possible value without order of calling the function.
        """
        if self.__curr_index == len(self.__thread):
            raise IndexError('The element doesn\'t exists')
        if not self.__event.is_set():
            self.__event.wait()
        self.__event.clear()
        res = self.__vals[self.__curr_index]
        self.__curr_index += 1
        return res

    
    def get_all(self):
        """
        Returns all the possible values and initialize them.
        """
        for t in self.__thread:
            t.join()
        
        if self.__exception is not None:
            raise self.__exception

        res = self.__vals
        
        # Get rid of everything
        self.__init_values__()
        
        return res

    

if __name__ == '__main__':
    import time
    import unittest
    

    
    @future
    def sleeping(s, t):
        time.sleep(s)
        return 'slept for '+str(s) + ' sec dreaming: ' + str(t)
    
    sleeping(2, t='world')
    sleeping(5, 'sheeps')
    sleeping(1, t='soccer')
    print(sleeping(2, t='women').get())
    print(sleeping(1, 'beach').get_all())
    
    class FutureTestCase(unittest.TestCase):
        def tearDown(self):
            sleeping(0, '').get_all()
            
        def test_positive(self):
            sleeping(5, t='sheeps')
            sleeping(1, 'beach')
            o = sleeping(3, 'nothing')
            res = o.get_all()
            self.assertEqual(3, len(res))

        def test_bad_index(self):
            sleeping(5, t='sheeps')
            o = sleeping(4, 'world')
            o.get()
            o.get()
            self.assertRaises(IndexError, o.get)
            
    unittest.main(verbosity=2)
    
    

History