Welcome, guest | Sign In | My Account | Store | Cart

Submit a series of batch commands (as expected by the os.system() function). Each will be submitted in a separate python thread, but will be a separate process, hence will take advantage of multi-core cpu's. A maximum number of simultaneous jobs can be set - throttling. A maximum time can be set before the routine will return, else it waits until all are completed.

Python, 90 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/python
import os, sys, thread, time

def __concurrent_batch(cmd,thread_num,completion_status_dict,exit_code_dict):
    """Helper routine for 'concurrent_batches."""

    exit_code_dict[thread_num] = os.system(cmd)
    completion_status_dict[thread_num] = 1  # for sum() routine

def concurrent_batches(batchlist,maxjobs=0,maxtime=0):
    """Run a list of batch commands simultaneously.

    'batchlist' is a list of strings suitable for submitting under os.system().
    Each job will run in a separate thread, with the thread ending when the
    subprocess ends.
    'maxjobs' will, if greater then zero, be the maximum number of simultaneous
    jobs which can concurrently run.  This would be used to limit the number of
    processes where too many could flood a system, causing performance issues.
    'maxtime', when greater than zero, be the maximum amount of time (seconds)
    that we will wait for processes to complete.  After that, we will return,
    but no jobs will be killed.  In other words, the jobs still running will
    continue to run, and hopefully finish in the course of time.

    example: concurrent_batches(("gzip abc","gzip def","gzip xyz"))

    returns: a dictionary of exit status codes, but only when ALL jobs are
             complete, or the maximum time has been exceeded.
             Note that if returning due to exceeding time, the dictionary will
             continue to be updated by the threads as they complete.
             The key of the dictionary is the thread number, which matches the
             index of the list of batch commands.  The value is the result of
             the os.system call.

    gotcha:  If both the maxjobs and maxtime is set, there is a possibility that
             not all jobs will be submitted.  The only way to detect this will be
             by checking for the absence of the KEY in the returned dictionary.
    """

    if not batchlist: return {}
    completion_status_dict, exit_code_dict = {}, {}
    num_jobs = len(batchlist)
    start_time = time.time()
    for thread_num, cmd in enumerate(batchlist):
        exit_code_dict[thread_num] = None
        completion_status_dict[thread_num] = 0 # for sum() routine
        thread.start_new_thread(__concurrent_batch,
              (cmd,thread_num,completion_status_dict,exit_code_dict))
        while True:
            completed = sum(completion_status_dict.values())
            if num_jobs == completed:
                return exit_code_dict      # all done
            running = thread_num - completed
            if maxtime > 0:
                if time.time() - start_time > maxtime:
                    return exit_code_dict
            if not maxjobs:
                if thread_num < num_jobs-1:  # have we submitted all jobs ?
                    break                  #  no, so break to for cmd loop
                else:
                    time.sleep(.2)         #  yes, so wait until jobs are complete
                    continue
            if running < maxjobs:
                break    # for next for loop
            time.sleep(.2)


if __name__ == "__main__":
    os.chdir("/tmp")
    for f in ("abc","def","xyz","abc.gz","def.gz","xyz.gz"):
        try:
            os.unlink(f)
        except:
            pass
    open("abc","w").write(str(globals()))
    open("def","w").write(str(globals()))
    open("xyz","w").write(str(globals()))
    batches = ("gzip abc","gzip def","gzip xyz","sleep 5","gzip mustfail")
    ret = concurrent_batches(batches,maxtime=3)
    try:
        os.unlink("abc.gz")
        os.unlink("def.gz")
        os.unlink("xyz.gz")
    except:
        print "Failed to delete compressed files, hence they were"
        print "not created, hence the concurrent routine has failed."
        sys.exit(1)
    print ret      # prints {0: 0, 1: 0, 2: 0, 3: None, 4: 256}
    time.sleep(6)
    print ret      # prints {0: 0, 1: 0, 2: 0, 3: 0, 4: 256}
    sys.exit(0)

Sorry - my unit-test capabilities are not great, so the simple test routine will have to suffice.

2 comments

samtygier 15 years, 8 months ago  # | flag

Handy script.

I noticed that it fails if you set maxjobs bigger than the number of jobs you have. In this case once all the jobs are submitted "if running < maxjobs:" test breaks us back out to the for loop, but there are no more jobs to submit. so the for loop finishes and the concurrent_batches() function finshes with out returning anything.

changing line 62 to if running < maxjobs and thread_num < num_jobs-1:

handles this case.

samtygier 15 years, 8 months ago  # | flag

also the line that calculates the number of running processes is out by one, as thread_num starts at zero fix by changing line 52 to: running = thread_num - completed + 1