Welcome, guest | Sign In | My Account | Store | Cart

A script used to launch multiple import scripts using the multiprocessing module. Developed to parallelize loading of multiple log files into a database for aggregate analysis

Python, 68 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python

# importLogsM.py - Matt Keranen 2011 (mksql@yahoo.com)

import os, getpass, logging, multiprocessing, sys, time, traceback
import logImport # The import script written to import a single file

# Add logging to current execution path, formatter set to match multiprocessing logger
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)

logf = os.path.join(sys.path[0], os.path.splitext(sys.argv[0])[0]+'.log')
filelog = logging.FileHandler(filename=logf)
filelog.setLevel(logging.INFO)
filelog.setFormatter(logging.Formatter('[%(levelname)s/%(module)s %(funcName)s] %(asctime)s | %(message)s'))
logger.addHandler(filelog)

def mapImport(infile, dstpath, server, engine, database, uid, pwd):
# Pass multiple args to import from mapped file list (Python 2.6)
    logImport.logger = logger 

    rc = logImport.importMP(infile, server, engine, database, uid, pwd)
    if rc > 0:
        try: os.rename(infile, os.path.join(dstpath, os.path.split(infile)[1]))
        except:
            logger.error("Error: %s" % traceback.format_exc(1))
            raise

    return rc

def mapArgs(args):
    return mapImport(*args)

if __name__ == '__main__':
    uid = 'import'
    pwd = getpass.getpass('Password for %s: ' % uid)

    src = 'F:\\import\\logs\\2011\\08'
    dst = 'F:\\import\\logs\\complete'

    jobs = []
    args = [dst, 'localhost', 's', 'testdb', uid, pwd]
    for root, dirs, files in os.walk(src):
        for name in files:
            jobs.append([os.path.join(root, name)] + args)

    if len(jobs) > 0:
        logger.info('Queueing %s files' % len(jobs))

        rc = []
        stime = time.time()
        pool = multiprocessing.Pool(4)
        try:
            rc = pool.map(mapArgs, jobs)
            pool.close()
            pool.join()
        except KeyboardInterrupt:  # http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt
            print('\nKeyboardInterrupt caught - terminating')
            pool.terminate()
            sys.exit()
        except:
            logger.error("Error: %s" % traceback.format_exc(1))
            pool.terminate()
            sys.exit()

        runtime = (time.time()-stime)+.001  # Avoid div/0

        logger.info('Complete: %d rows, %d sec (%d r/s)' % (sum(rc), runtime, sum(rc)/runtime))
Created by Matt Keranen on Tue, 23 Aug 2011 (MIT)
Python recipes (4591)
Matt Keranen's recipes (12)

Required Modules

  • (none specified)

Other Information and Tasks