Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/python
import os, sys, thread, time

def __concurrent_batch(cmd,thread_num,completion_status_dict,exit_code_dict):
    """Helper routine for 'concurrent_batches."""

    exit_code_dict[thread_num] = os.system(cmd)
    completion_status_dict[thread_num] = 1  # for sum() routine

def concurrent_batches(batchlist,maxjobs=0,maxtime=0):
    """Run a list of batch commands simultaneously.

    'batchlist' is a list of strings suitable for submitting under os.system().
    Each job will run in a separate thread, with the thread ending when the
    subprocess ends.
    'maxjobs' will, if greater then zero, be the maximum number of simultaneous
    jobs which can concurrently run.  This would be used to limit the number of
    processes where too many could flood a system, causing performance issues.
    'maxtime', when greater than zero, be the maximum amount of time (seconds)
    that we will wait for processes to complete.  After that, we will return,
    but no jobs will be killed.  In other words, the jobs still running will
    continue to run, and hopefully finish in the course of time.

    example: concurrent_batches(("gzip abc","gzip def","gzip xyz"))

    returns: a dictionary of exit status codes, but only when ALL jobs are
             complete, or the maximum time has been exceeded.
             Note that if returning due to exceeding time, the dictionary will
             continue to be updated by the threads as they complete.
             The key of the dictionary is the thread number, which matches the
             index of the list of batch commands.  The value is the result of
             the os.system call.

    gotcha:  If both the maxjobs and maxtime is set, there is a possibility that
             not all jobs will be submitted.  The only way to detect this will be
             by checking for the absence of the KEY in the returned dictionary.
    """

    if not batchlist: return {}
    completion_status_dict, exit_code_dict = {}, {}
    num_jobs = len(batchlist)
    start_time = time.time()
    for thread_num, cmd in enumerate(batchlist):
        exit_code_dict[thread_num] = None
        completion_status_dict[thread_num] = 0 # for sum() routine
        thread.start_new_thread(__concurrent_batch,
              (cmd,thread_num,completion_status_dict,exit_code_dict))
        while True:
            completed = sum(completion_status_dict.values())
            if num_jobs == completed:
                return exit_code_dict      # all done
            running = thread_num - completed
            if maxtime > 0:
                if time.time() - start_time > maxtime:
                    return exit_code_dict
            if not maxjobs:
                if thread_num < num_jobs-1:  # have we submitted all jobs ?
                    break                  #  no, so break to for cmd loop
                else:
                    time.sleep(.2)         #  yes, so wait until jobs are complete
                    continue
            if running < maxjobs:
                break    # for next for loop
            time.sleep(.2)


if __name__ == "__main__":
    os.chdir("/tmp")
    for f in ("abc","def","xyz","abc.gz","def.gz","xyz.gz"):
        try:
            os.unlink(f)
        except:
            pass
    open("abc","w").write(str(globals()))
    open("def","w").write(str(globals()))
    open("xyz","w").write(str(globals()))
    batches = ("gzip abc","gzip def","gzip xyz","sleep 5","gzip mustfail")
    ret = concurrent_batches(batches,maxtime=3)
    try:
        os.unlink("abc.gz")
        os.unlink("def.gz")
        os.unlink("xyz.gz")
    except:
        print "Failed to delete compressed files, hence they were"
        print "not created, hence the concurrent routine has failed."
        sys.exit(1)
    print ret      # prints {0: 0, 1: 0, 2: 0, 3: None, 4: 256}
    time.sleep(6)
    print ret      # prints {0: 0, 1: 0, 2: 0, 3: 0, 4: 256}
    sys.exit(0)

History