Welcome, guest | Sign In | My Account | Store | Cart

This recipe presents the sync class and an example usage. The primary ingredient in this recipe are the locks from the thread module. The idea that is followed is quite simple. You have a room with a front door and a back door. Threads walk into the front door and out the back door. Your room only has a certain capacity for threads (specified on construction of the room [sync object]) and the back door is closed. When the room is full, the front door is closed; and the back door is opened. When the room has been emptied, the front door is opened; and the back door is closed. In this way, you know that all of your threads pile into a room; and that when they all arrive, they will all exit concurrently with each other. The example function shows two different sync objects being used in different roles.

Python, 45 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import thread

class sync:

    def __init__(self, threads):
        self.__threads = threads
        self.__count = 0
        self.__main = thread.allocate_lock()
        self.__exit = thread.allocate_lock()
        self.__exit.acquire()

    def sync(self):
        self.__main.acquire()
        self.__count += 1
        if self.__count < self.__threads:
            self.__main.release()
        else:
            self.__exit.release()
        self.__exit.acquire()
        self.__count -= 1
        if self.__count > 0:
            self.__exit.release()
        else:
            self.__main.release()

def example():
    def get_input(share):
        while share[0]:
            share[1] = raw_input('Please say something.\n')
            share[2].sync()
        share[3].sync()
    def do_output(share):
        while share[0]:
            share[2].sync()
            print 'You said, "%s"' % share[1]
        share[3].sync()
    share = [True, None, sync(2), sync(3)]
    thread.start_new_thread(get_input, (share,))
    thread.start_new_thread(do_output, (share,))
    import time; time.sleep(60)
    share[0] = False
    share[3].sync()

if __name__ == '__main__':
    example()

Why would someone want to use the sync class? Sometimes when writing a multi-threaded application, it is desireable for your threads to eventually syncronize with each other so that they are all at the same point in execution. This may be somewhat difficult, especially without a special mechanism to do this for you. This is where the sync class comes into play. A sync object can be shared by several threads; and if it knows how many objects it is being shared with, it can automatically syncronize those threads when properly used. As shown in the example function, threads that wish to syncronize with other threads call the sync function on the sync object; and syncronization will automatically be accomplished (assuming that the other threads using the sync object also call the sync function as well). This is just a simple implementation of the syncronization concept and could be expanded more by adding more options to it (such as maximum blocking times et cetera). If you are into writing threaded programs, hopefully this will be a useful recipe for you.

3 comments

Stephen Chappell (author) 15 years, 8 months ago  # | flag

Bad Example? If you want to get the same results with less code try this.

from time import clock
time = clock()
while clock() - time If you want to get the same results with less code try this.
<pre>
from time import clock
time = clock()
while clock() - time

</pre>

Raymond Hettinger 15 years, 8 months ago  # | flag

Interesting idea. Example broken. Core concept limited to step-wise parallel ops. I'm intrigued by the sync concept and would enjoy seeing more convincing examples.

The sync(3) part of the example doesn't add anything new or better. Using .join() is the preferred way to wait on multiple threads to terminate. So, if sync is to show its worth, it will have to be with threads that carry-on after synchronization (as in the sync(2) example).

The sync(2) part of the example is broken. Potentially, the input thread could mutate the value again before the output thread has a chance to print. This is easily seen if you insert a time.sleep(60) before the print or if you assign a large value to sys.checkinterval(). The RightWay(tm) for this kind of synchronized data passing is to use the Queue module (with a blocking get() before output) or to use a condition variable (with the lock held until after the print).

The error with the sync(2) example indicates that syncing is less broadly applicable than suggested. What value is there in syncing if two or more racing threads immediately resume their race after the synchronization step?

The answer that springs to mind is where one or more threads create some values or conditions that can then be relied on by all threads as having been done. When the threads resume, the producer can go on to do something else but needs to take care not to alter or overwrite whatever was just produced (as it has no way of knowing when other threads are going to read the value).

Given this limitation, Queues or condition variables would always be preferred for handling simple producer/consumer relationships. In contrast, syncing would be of value for more interdependent step-wise relationships where all threads can work in parallel to build on the results of a previous step. For example, in constructing a multi-floor building, threads for framing walls can proceed in parallel but cannot go on to build the next floor until the current level is complete.

Stephen Chappell (author) 15 years, 8 months ago  # | flag

Second Example.

import os, random, sync, time

SLOTS = 10

def example():
    def writer(slots, select, stop):
        while True:
            if not random.randint(0, 10 ** 5):
                stop.sync()
                time.sleep(5)
            slots[select] = (slots[select] + 1) % 10
    def reader(slots):
        while True:
            if os.name == 'nt':
                os.system('cls')
            elif os.name == 'posix':
                os.system('clear')
            for slot in slots:
                print slot,
    slots = [0 for slot in range(SLOTS)]
    stop = sync.sync(len(slots))
    sync.thread.start_new_thread(reader, (slots,))
    for select in range(len(slots)):
        sync.thread.start_new_thread(writer, (slots, select, stop))
    sync.sync(2).sync()

if __name__ == '__main__':
    example()

What does it do? This simulates a slot machine with SLOTS number of

slots where slots stop independently of each other. The slot machine

automatically starts up again five seconds after all slots have

stopped. The last line in the example function is only meant to block

indefinitely.

With regards to the original code:

  1. If someone wanted the do_output thread to wait before printing,

then they are programming with the possibility that everything from

get_input might not be printed. It is the programmer's responsibility

to ensure the correctness of the code written.

  1. Notice that the threading module was NOT imported. By the design of

the code, joining was not an option for the threads. That is why

sync(3) was introduced -- to show a method of synchronizing the

termination of threads.