I got inspired to this class, after having read about the java SwingWorker. The problem is that you sometimes have a tough task you want to do in a GUI program, but you don't want the UI to lock. This recipe solves the problem by running the tough code in a background thread, while still letting you do useful interaction with it, like getting the progress or the latest results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | from threading import Lock, RLock, Thread, _MainThread, currentThread
import Queue
from gtk.gdk import threads_enter, threads_leave
from gobject import GObject, SIGNAL_RUN_FIRST
class GtkWorker (GObject, Thread):
__gsignals__ = {
"progressed": (SIGNAL_RUN_FIRST, None, (float,)),
"published": (SIGNAL_RUN_FIRST, None, (object,)),
"done": (SIGNAL_RUN_FIRST, None, ())
}
def __init__ (self, func):
""" Initialize a new GtkWorker around a specific function """
# WARNING: This deadlocks if calling code already has the gdk lock and
# is not the MainThread
if type(currentThread()) != _MainThread:
threads_enter()
GObject.__init__(self)
if type(currentThread()) != _MainThread:
threads_leave()
Thread.__init__(self)
self.setDaemon(True)
# By some reason we cannot access __gsignals__, so we have to do a
# little double work here
self.connections = {"progressed": 0, "published": 0, "done": 0}
self.handler_ids = {}
self.func = func
self.cancelled = False
self.done = False
self.progress = 0
########################################################################
# Publish and progress queues #
########################################################################
class Publisher (Thread):
SEND_LIST, SEND_LAST = range(2)
def __init__ (self, parrent, queue, signal, sendPolicy):
Thread.__init__(self)
self.setDaemon(True)
self.parrent = parrent
self.queue = queue
self.signal = signal
self.sendPolicy = sendPolicy
def run (self):
while True:
v = self.queue.get()
if v == None:
break
threads_enter()
l = [v]
while True:
try:
v = self.queue.get_nowait()
except Queue.Empty:
break
else: l.append(v)
try:
if self.sendPolicy == self.SEND_LIST:
self.parrent.emit(self.signal, l)
elif self.sendPolicy == self.SEND_LAST:
self.parrent.emit(self.signal, l[-1])
finally:
threads_leave()
self.publishQueue = Queue.Queue()
self.publisher = Publisher (
self, self.publishQueue, "published", Publisher.SEND_LIST)
self.publisher.start()
self.progressQueue = Queue.Queue()
self.progressor = Publisher (
self, self.progressQueue, "progressed", Publisher.SEND_LAST)
self.progressor.start()
############################################################################
# We override the connect/disconnect methods in order to count the number #
# of clients connected to each signal. #
# This is done for performance reasons, as some work can be skipped, if no #
# clients are connected anyways #
############################################################################
def _mul_connect (self, method, signal, handler, *args):
self.connections[signal] += 1
handler_id = method (self, signal, handler, *args)
self.handler_ids[handler_id] = signal
return handler_id
def connect (self, detailed_signal, handler, *args):
return self._mul_connect (GObject.connect,
detailed_signal, handler, *args)
def connect_after (self, detailed_signal, handler, *args):
return self._mul_connect (GObject.connect_after,
detailed_signal, handler, *args)
def connect_object (self, detailed_signal, handler, gobject, *args):
return self._mul_connect (GObject.connect_object,
detailed_signal, handler, gobject, *args)
def connect_object_after (self, detailed_signal, handler, gobject, *args):
return self._mul_connect (GObject.connect,
detailed_signal, handler, gobject, *args)
def disconnect (self, handler_id):
self.connections[self.handler_ids[handler_id]] -= 1
del self.handler_ids[handler_id]
return GObject.disconnect(self, handler_id)
handler_disconnect = disconnect
############################################################################
# The following methods (besides run()) are used to interact with the #
# worker #
############################################################################
def get (self, timeout=None):
""" 'get' will block until the processed function returns, timeout
happens, or the work is cancelled.
You can test if you were cancelled by the isCancelled() method, and
you can test if you reached the timeout by the isAlive() method.
Notice, cancelling will not make 'get' unblock, besides if you build
'isCancelled' calls into your function.
Warning: the get function assumes that if you are the MainThread you
have the gdklock and if you are not the MainThread you don't have
the gdklock.
If this is not true, and the work is not done, calling get will
result in a deadlock.
If you haven't used the gtk.gdk.threads_enter nor
gtk.gdk.threads_leave function, everything should be fine."""
if not self.isDone():
if type(currentThread()) == _MainThread:
threads_leave()
self.join(timeout)
if type(currentThread()) == _MainThread:
threads_enter()
if self.isAlive():
return None
self.done = True
return self.result
def execute (self):
""" Start the worker """
if not self.isDone():
self.start()
def run (self):
self.result = self.func(self)
self.done = True
if self.connections["done"] >= 1:
threads_enter()
# In python 2.5 we can use self.publishQueue.join() to wait for all
# publish items to have been processed.
self.emit("done")
threads_leave()
def cancel (self):
""" Cancel work.
As python has no way of trying to interupt a thread, we don't try
to do so. The cancelled attribute is simply set to true, which means
that no more signals are emitted.
You can build 'isCancelled' calls into your function, to help it
exit when it doesn't need to run anymore.
while not worker.isCancelled():
...
"""
self.cancelled = True
self.done = True
############################################################################
# Get stuf #
############################################################################
def isCancelled (self):
return self.cancelled
def isDone (self):
return self.done
def getProgress (self):
return self.progress
############################################################################
# These methods are used by the function to indicate progress and publish #
# process #
############################################################################
def setProgress (self, progress):
""" setProgress should be called from inside the processed function.
When the gdklock gets ready, it will emit the "progressed" signal,
with the value of the latest setProgress call """
if self.isCancelled():
return
if self.progress != progress:
self.progress = progress
self.progressQueue.put(progress)
def publish (self, val):
""" Publish should be called from inside the processed function. It will
queue up the latest results, and when we get access to the gdklock,
it will emit the "published" signal. """
if self.connections["published"] < 1 or self.isCancelled():
return
self.publishQueue.put(val)
############################################################################
# Other #
############################################################################
def __del__ (self):
self.cancel()
################################################################################
# Demo usage #
################################################################################
if __name__ == "__main__":
def findPrimes (worker):
from math import sqrt
limit = 10**4.
primes = []
for n in xrange(2, int(limit)+1):
for p in primes:
if worker.isCancelled():
return primes
if p > n**2:
break
if n % p == 0:
break
else:
primes.append(n)
worker.publish(n)
worker.setProgress(n/limit)
return primes
import gtk
w = gtk.Window()
vbox = gtk.VBox()
w.add(vbox)
worker = GtkWorker(findPrimes)
sbut = gtk.Button("Start")
def callback (button, *args):
sbut.set_sensitive(False)
worker.execute()
sbut.connect("clicked", callback)
vbox.add(sbut)
cbut = gtk.Button("Cancel")
def callback (button, *args):
cbut.set_sensitive(False)
worker.cancel()
cbut.connect("clicked", callback)
vbox.add(cbut)
gbut = gtk.Button("Get")
def callback (button, *args):
gbut.set_sensitive(False)
print "Found:", worker.get()
gbut.connect("clicked", callback)
vbox.add(gbut)
prog = gtk.ProgressBar()
def callback (worker, progress):
prog.set_fraction(progress)
worker.connect("progressed", callback)
vbox.add(prog)
field = gtk.Entry()
def process (worker, primes):
field.set_text(str(primes[-1]))
worker.connect("published", process)
vbox.add(field)
def done (worker):
print "Finished, Cancelled:", worker.isCancelled()
worker.connect("done", done)
w.connect("destroy", gtk.main_quit)
w.show_all()
gtk.gdk.threads_init()
gtk.main()
|
It might be possible to implement a version of the GtkWorker based on processes. A such version would more easily be able to kill cancelled tasks for the price of having a harder time transferring work results, such as a network connection.
Generally this shouldn't be much of a problem, if you build you function around a "while not worker.isCancelled()" loop, such as the one in the findPrimes function.