Welcome, guest | Sign In | My Account | Store | Cart

I got inspired to this class, after having read about the java SwingWorker. The problem is that you sometimes have a tough task you want to do in a GUI program, but you don't want the UI to lock. This recipe solves the problem by running the tough code in a background thread, while still letting you do useful interaction with it, like getting the progress or the latest results.

Python, 289 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from threading import Lock, RLock, Thread, _MainThread, currentThread
import Queue
from gtk.gdk import threads_enter, threads_leave
from gobject import GObject, SIGNAL_RUN_FIRST

class GtkWorker (GObject, Thread):
    
    __gsignals__ = {
        "progressed": (SIGNAL_RUN_FIRST, None, (float,)),
        "published":  (SIGNAL_RUN_FIRST, None, (object,)),
        "done":       (SIGNAL_RUN_FIRST, None, ())
    }
    
    def __init__ (self, func):
        """ Initialize a new GtkWorker around a specific function """
        
        # WARNING: This deadlocks if calling code already has the gdk lock and
        # is not the MainThread
        if type(currentThread()) != _MainThread:
            threads_enter()
        GObject.__init__(self)
        if type(currentThread()) != _MainThread:
            threads_leave()
        
        Thread.__init__(self)
        self.setDaemon(True)
        
        # By some reason we cannot access __gsignals__, so we have to do a
        # little double work here
        self.connections = {"progressed": 0, "published": 0, "done": 0}
        self.handler_ids = {}
        
        self.func = func
        self.cancelled = False
        self.done = False
        self.progress = 0
        
        ########################################################################
        # Publish and progress queues                                          #
        ########################################################################
        
        class Publisher (Thread):
            SEND_LIST, SEND_LAST = range(2)
            
            def __init__ (self, parrent, queue, signal, sendPolicy):
                Thread.__init__(self)
                self.setDaemon(True)
                self.parrent = parrent
                self.queue = queue
                self.signal = signal
                self.sendPolicy = sendPolicy
            
            def run (self):
                while True:
                    v = self.queue.get()
                    if v == None:
                        break
                    threads_enter()
                    l = [v]
                    while True:
                        try:
                            v = self.queue.get_nowait()
                        except Queue.Empty:
                            break
                        else: l.append(v)
                    try:
                        if self.sendPolicy == self.SEND_LIST:
                            self.parrent.emit(self.signal, l)
                        elif self.sendPolicy == self.SEND_LAST:
                            self.parrent.emit(self.signal, l[-1])
                    finally:
                        threads_leave()
        
        self.publishQueue = Queue.Queue()
        self.publisher = Publisher (
                self, self.publishQueue, "published", Publisher.SEND_LIST)
        self.publisher.start()
        
        self.progressQueue = Queue.Queue()
        self.progressor = Publisher (
                self, self.progressQueue, "progressed", Publisher.SEND_LAST)
        self.progressor.start()
    
    ############################################################################
    # We override the connect/disconnect methods in order to count the number  #
    # of clients connected to each signal.                                     #
    # This is done for performance reasons, as some work can be skipped, if no #
    # clients are connected anyways                                            #
    ############################################################################
    
    def _mul_connect (self, method, signal, handler, *args):
        self.connections[signal] += 1
        handler_id = method (self, signal, handler, *args)
        self.handler_ids[handler_id] = signal
        return handler_id
    
    def connect (self, detailed_signal, handler, *args):
        return self._mul_connect (GObject.connect,
                detailed_signal, handler, *args)
    def connect_after (self, detailed_signal, handler, *args):
        return self._mul_connect (GObject.connect_after,
                detailed_signal, handler, *args)
    def connect_object (self, detailed_signal, handler, gobject, *args):
        return self._mul_connect (GObject.connect_object,
                detailed_signal, handler, gobject, *args)
    def connect_object_after (self, detailed_signal, handler, gobject, *args):
        return self._mul_connect (GObject.connect,
                detailed_signal, handler, gobject, *args)
    
    def disconnect (self, handler_id):
        self.connections[self.handler_ids[handler_id]] -= 1
        del self.handler_ids[handler_id]
        return GObject.disconnect(self, handler_id)
    handler_disconnect = disconnect
    
    ############################################################################
    # The following methods (besides run()) are used to interact with the      #
    # worker                                                                   #
    ############################################################################
    
    def get (self, timeout=None):
        """ 'get' will block until the processed function returns, timeout
            happens, or the work is cancelled.
            You can test if you were cancelled by the isCancelled() method, and
            you can test if you reached the timeout by the isAlive() method.
            Notice, cancelling will not make 'get' unblock, besides if you build
            'isCancelled' calls into your function.
            
            Warning: the get function assumes that if you are the MainThread you
            have the gdklock and if you are not the MainThread you don't have
            the gdklock.
            If this is not true, and the work is not done, calling get will
            result in a deadlock.
            If you haven't used the gtk.gdk.threads_enter nor
            gtk.gdk.threads_leave function, everything should be fine."""
        
        if not self.isDone():
            if type(currentThread()) == _MainThread:
                threads_leave()
            self.join(timeout)
            if type(currentThread()) == _MainThread:
                threads_enter()
            if self.isAlive():
                return None
            self.done = True
        return self.result
    
    def execute (self):
        """ Start the worker """
        if not self.isDone():
            self.start()
    
    def run (self):
        self.result = self.func(self)
        self.done = True
        if self.connections["done"] >= 1:
            threads_enter()
            # In python 2.5 we can use self.publishQueue.join() to wait for all
            # publish items to have been processed.
            self.emit("done")
            threads_leave()
    
    def cancel (self):
        """ Cancel work.
            As python has no way of trying to interupt a thread, we don't try
            to do so. The cancelled attribute is simply set to true, which means
            that no more signals are emitted.
            You can build 'isCancelled' calls into your function, to help it
            exit when it doesn't need to run anymore.
            while not worker.isCancelled():
                ...
            """
        self.cancelled = True
        self.done = True
    
    ############################################################################
    # Get stuf                                                                 #
    ############################################################################
    
    def isCancelled (self):
        return self.cancelled
    
    def isDone (self):
        return self.done
    
    def getProgress (self):
        return self.progress
    
    ############################################################################
    # These methods are used by the function to indicate progress and publish  #
    # process                                                                  #
    ############################################################################
    
    def setProgress (self, progress):
        """ setProgress should be called from inside the processed function.
            When the gdklock gets ready, it will emit the "progressed" signal,
            with the value of the latest setProgress call """
        if self.isCancelled():
            return
        if self.progress != progress:
            self.progress = progress
            self.progressQueue.put(progress)
    
    def publish (self, val):
        """ Publish should be called from inside the processed function. It will
            queue up the latest results, and when we get access to the gdklock,
            it will emit the "published" signal. """
        if self.connections["published"] < 1 or self.isCancelled():
            return
        self.publishQueue.put(val)
    
    ############################################################################
    # Other                                                                    #
    ############################################################################
    
    def __del__ (self):
        self.cancel()

################################################################################
# Demo usage                                                                   #
################################################################################

if __name__ == "__main__":
    def findPrimes (worker):
        from math import sqrt
        limit = 10**4.
        primes = []
        for n in xrange(2, int(limit)+1):
            for p in primes:
                if worker.isCancelled():
                    return primes
                if p > n**2:
                    break
                if n % p == 0:
                    break
            else:
                primes.append(n)
                worker.publish(n)
            worker.setProgress(n/limit)
        return primes
    
    import gtk
    w = gtk.Window()
    vbox = gtk.VBox()
    w.add(vbox)
    
    worker = GtkWorker(findPrimes)
    
    sbut = gtk.Button("Start")
    def callback (button, *args):
        sbut.set_sensitive(False)
        worker.execute()
    sbut.connect("clicked", callback)
    vbox.add(sbut)
    
    cbut = gtk.Button("Cancel")
    def callback (button, *args):
        cbut.set_sensitive(False)
        worker.cancel()
    cbut.connect("clicked", callback)
    vbox.add(cbut)
    
    gbut = gtk.Button("Get")
    def callback (button, *args):
        gbut.set_sensitive(False)
        print "Found:", worker.get()
    gbut.connect("clicked", callback)
    vbox.add(gbut)
    
    prog = gtk.ProgressBar()
    def callback (worker, progress):
        prog.set_fraction(progress)
    worker.connect("progressed", callback)
    vbox.add(prog)
    
    field = gtk.Entry()
    def process (worker, primes):
        field.set_text(str(primes[-1]))
    worker.connect("published", process)
    vbox.add(field)
    
    def done (worker):
        print "Finished, Cancelled:", worker.isCancelled()
    worker.connect("done", done)
    
    w.connect("destroy", gtk.main_quit)
    w.show_all()
    gtk.gdk.threads_init()
    gtk.main()

It might be possible to implement a version of the GtkWorker based on processes. A such version would more easily be able to kill cancelled tasks for the price of having a harder time transferring work results, such as a network connection.

Generally this shouldn't be much of a problem, if you build you function around a "while not worker.isCancelled()" loop, such as the one in the findPrimes function.