import time
from threading import Thread, Condition, Lock
from Queue import Queue, Empty
from functools import partial
class CmdThread(Thread):
def __init__(self):
Thread.__init__(self)
self._queue = Queue()
self._status = 'NORMAL'
self._statusLock = Lock()
self._possibleStatus = ('NORMAL', 'SLEEPING', 'DONE')
self._condition = Condition(self._statusLock)
self.daemon = True
self.start()
def run(self):
while True:
self._condition.acquire()
if self._status == 'DONE':
self._condition.release()
break
if self._status == 'SLEEPING':
self._condition.wait() # this function will release the lock when going to sleep
self._condition.release()
continue
self._condition.release()
try:
task = self._queue.get_nowait()
except Empty:
self._condition.acquire()
self._status = 'SLEEPING'
self._condition.wait()
# Here, we don't change the status to 'NORMAL' here
# The status is supposed to be changed by the waker
# Before this command thread wakes up.
self._condition.release()
continue
if callable(task):
try:
print 'perform task.'
task()
except Exception:
self._statusLock.acquire()
self._status = 'DONE'
self._statusLock.release()
raise
else:
# you can define the task interface here.
pass
def addCmd(self, callableObj, *args, **argd):
""" non-blocking call.
"""
assert(callable(callableObj))
self._condition.acquire()
if self._status == 'DONE':
self._condition.release()
return
self._queue.put(partial(callableObj, *args, **argd))
self._status = 'NORMAL'
self._condition.notifyAll()
self._condition.release()
def close(self, timeout=None):
""" blocking call, return after close/cancel is done.
"""
undoneJobs = []
self._condition.acquire()
if self._status == 'DONE':
self._condition.release()
return
self._status = 'DONE'
self._condition.notifyAll()
while not self._queue.empty():
undoneJobs.append(self._queue.get())
self._condition.release()
self.join(timeout)
return undoneJobs
def pause(self, timeout=None):
""" non-blocking call.
"""
self._condition.acquire()
if self._status == 'DONE':
self._condition.release()
return
self._status = 'SLEEPING'
self._condition.notifyAll()
self._condition.release()
def resume(self):
""" non-blocking call.
"""
self._condition.acquire()
if self._status == 'DONE':
self._condition.release()
return
self._status = 'NORMAL'
self._condition.notifyAll()
self._condition.release()
def hasCmd(self):
""" non-blocking call.
"""
return not self._queue.empty()
def demo():
def exampleTask():
print 'job beginning...'
time.sleep(2)
print 'job finishing...'
cmdObj = CmdThread()
for i in xrange(5):
cmdObj.addCmd(exampleTask)
return cmdObj
if __name__=='__main__':
cmd = demo()
cmd.pause()
print 'main thread is going to sleep.'
time.sleep(3)
print 'main threading is going to close the cmd thread.'
print cmd.close()
print 'main thread finishes closing the cmd thread.'