A script used to launch multiple import scripts using the multiprocessing module. Developed to parallelize loading of multiple log files into a database for aggregate analysis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | #!/usr/bin/env python
# importLogsM.py - Matt Keranen 2011 (mksql@yahoo.com)
import os, getpass, logging, multiprocessing, sys, time, traceback
import logImport # The import script written to import a single file
# Add logging to current execution path, formatter set to match multiprocessing logger
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
logf = os.path.join(sys.path[0], os.path.splitext(sys.argv[0])[0]+'.log')
filelog = logging.FileHandler(filename=logf)
filelog.setLevel(logging.INFO)
filelog.setFormatter(logging.Formatter('[%(levelname)s/%(module)s %(funcName)s] %(asctime)s | %(message)s'))
logger.addHandler(filelog)
def mapImport(infile, dstpath, server, engine, database, uid, pwd):
# Pass multiple args to import from mapped file list (Python 2.6)
logImport.logger = logger
rc = logImport.importMP(infile, server, engine, database, uid, pwd)
if rc > 0:
try: os.rename(infile, os.path.join(dstpath, os.path.split(infile)[1]))
except:
logger.error("Error: %s" % traceback.format_exc(1))
raise
return rc
def mapArgs(args):
return mapImport(*args)
if __name__ == '__main__':
uid = 'import'
pwd = getpass.getpass('Password for %s: ' % uid)
src = 'F:\\import\\logs\\2011\\08'
dst = 'F:\\import\\logs\\complete'
jobs = []
args = [dst, 'localhost', 's', 'testdb', uid, pwd]
for root, dirs, files in os.walk(src):
for name in files:
jobs.append([os.path.join(root, name)] + args)
if len(jobs) > 0:
logger.info('Queueing %s files' % len(jobs))
rc = []
stime = time.time()
pool = multiprocessing.Pool(4)
try:
rc = pool.map(mapArgs, jobs)
pool.close()
pool.join()
except KeyboardInterrupt: # http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt
print('\nKeyboardInterrupt caught - terminating')
pool.terminate()
sys.exit()
except:
logger.error("Error: %s" % traceback.format_exc(1))
pool.terminate()
sys.exit()
runtime = (time.time()-stime)+.001 # Avoid div/0
logger.info('Complete: %d rows, %d sec (%d r/s)' % (sum(rc), runtime, sum(rc)/runtime))
|