Submit a series of batch commands (as expected by the os.system() function). Each will be submitted in a separate python thread, but will be a separate process, hence will take advantage of multi-core cpu's. A maximum number of simultaneous jobs can be set - throttling. A maximum time can be set before the routine will return, else it waits until all are completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | #!/usr/bin/python
import os, sys, thread, time
def __concurrent_batch(cmd,thread_num,completion_status_dict,exit_code_dict):
"""Helper routine for 'concurrent_batches."""
exit_code_dict[thread_num] = os.system(cmd)
completion_status_dict[thread_num] = 1 # for sum() routine
def concurrent_batches(batchlist,maxjobs=0,maxtime=0):
"""Run a list of batch commands simultaneously.
'batchlist' is a list of strings suitable for submitting under os.system().
Each job will run in a separate thread, with the thread ending when the
subprocess ends.
'maxjobs' will, if greater then zero, be the maximum number of simultaneous
jobs which can concurrently run. This would be used to limit the number of
processes where too many could flood a system, causing performance issues.
'maxtime', when greater than zero, be the maximum amount of time (seconds)
that we will wait for processes to complete. After that, we will return,
but no jobs will be killed. In other words, the jobs still running will
continue to run, and hopefully finish in the course of time.
example: concurrent_batches(("gzip abc","gzip def","gzip xyz"))
returns: a dictionary of exit status codes, but only when ALL jobs are
complete, or the maximum time has been exceeded.
Note that if returning due to exceeding time, the dictionary will
continue to be updated by the threads as they complete.
The key of the dictionary is the thread number, which matches the
index of the list of batch commands. The value is the result of
the os.system call.
gotcha: If both the maxjobs and maxtime is set, there is a possibility that
not all jobs will be submitted. The only way to detect this will be
by checking for the absence of the KEY in the returned dictionary.
"""
if not batchlist: return {}
completion_status_dict, exit_code_dict = {}, {}
num_jobs = len(batchlist)
start_time = time.time()
for thread_num, cmd in enumerate(batchlist):
exit_code_dict[thread_num] = None
completion_status_dict[thread_num] = 0 # for sum() routine
thread.start_new_thread(__concurrent_batch,
(cmd,thread_num,completion_status_dict,exit_code_dict))
while True:
completed = sum(completion_status_dict.values())
if num_jobs == completed:
return exit_code_dict # all done
running = thread_num - completed
if maxtime > 0:
if time.time() - start_time > maxtime:
return exit_code_dict
if not maxjobs:
if thread_num < num_jobs-1: # have we submitted all jobs ?
break # no, so break to for cmd loop
else:
time.sleep(.2) # yes, so wait until jobs are complete
continue
if running < maxjobs:
break # for next for loop
time.sleep(.2)
if __name__ == "__main__":
os.chdir("/tmp")
for f in ("abc","def","xyz","abc.gz","def.gz","xyz.gz"):
try:
os.unlink(f)
except:
pass
open("abc","w").write(str(globals()))
open("def","w").write(str(globals()))
open("xyz","w").write(str(globals()))
batches = ("gzip abc","gzip def","gzip xyz","sleep 5","gzip mustfail")
ret = concurrent_batches(batches,maxtime=3)
try:
os.unlink("abc.gz")
os.unlink("def.gz")
os.unlink("xyz.gz")
except:
print "Failed to delete compressed files, hence they were"
print "not created, hence the concurrent routine has failed."
sys.exit(1)
print ret # prints {0: 0, 1: 0, 2: 0, 3: None, 4: 256}
time.sleep(6)
print ret # prints {0: 0, 1: 0, 2: 0, 3: 0, 4: 256}
sys.exit(0)
|
Sorry - my unit-test capabilities are not great, so the simple test routine will have to suffice.
Handy script.
I noticed that it fails if you set maxjobs bigger than the number of jobs you have. In this case once all the jobs are submitted "if running < maxjobs:" test breaks us back out to the for loop, but there are no more jobs to submit. so the for loop finishes and the concurrent_batches() function finshes with out returning anything.
changing line 62 to if running < maxjobs and thread_num < num_jobs-1:
handles this case.
also the line that calculates the number of running processes is out by one, as thread_num starts at zero fix by changing line 52 to: running = thread_num - completed + 1