import pysqlite2.dbapi2 as sqlite import Queue, time, thread, os from threading import Thread _threadex = thread.allocate_lock() qthreads = 0 sqlqueue = Queue.Queue() ConnectCmd = "connect" SqlCmd = "SQL" StopCmd = "stop" class DbCmd: def __init__(self, cmd, params=[]): self.cmd = cmd self.params = params class DbWrapper(Thread): def __init__(self, path, nr): Thread.__init__(self) self.path = path self.nr = nr def run(self): global qthreads con = sqlite.connect(self.path) cur = con.cursor() while True: s = sqlqueue.get() print "Conn %d -> %s -> %s" % (self.nr, s.cmd, s.params) if s.cmd == SqlCmd: commitneeded = False res = [] # s.params is a list to bundle statements into a "transaction" for sql in s.params: cur.execute(sql[0],sql[1]) if not sql[0].upper().startswith("SELECT"): commitneeded = True for row in cur.fetchall(): res.append(row) if commitneeded: con.commit() s.resultqueue.put(res) else: _threadex.acquire() qthreads -= 1 _threadex.release() # allow other threads to stop sqlqueue.put(s) s.resultqueue.put(None) break def execSQL(s): if s.cmd == ConnectCmd: global qthreads _threadex.acquire() qthreads += 1 _threadex.release() wrap = DbWrapper(s.params, qthreads) wrap.start() elif s.cmd == StopCmd: s.resultqueue = Queue.Queue() sqlqueue.put(s) # sleep until all threads are stopped while qthreads > 0: time.sleep(0.1) else: s.resultqueue = Queue.Queue() sqlqueue.put(s) return s.resultqueue.get() if __name__ == "__main__": dbname = "test.db" execSQL(DbCmd(ConnectCmd, dbname)) execSQL(DbCmd(SqlCmd, [("create table people (name_last, age integer);",())])) # don't add connections before creating table execSQL(DbCmd(ConnectCmd, dbname)) # insert one row execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Smith', 80))])) # insert two rows in one transaction execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Jones', 55)), ("insert into people (name_last, age) values (?, ?);", ('Gruns', 25))])) for p in execSQL(DbCmd(SqlCmd, [("select * from people", ())])): print p execSQL(DbCmd(StopCmd)) os.remove(dbname)