1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 | import pysqlite2.dbapi2 as sqlite
import Queue, time, thread, os
from threading import Thread
_threadex = thread.allocate_lock()
qthreads = 0
sqlqueue = Queue.Queue()
ConnectCmd = "connect"
SqlCmd = "SQL"
StopCmd = "stop"
class DbCmd:
def __init__(self, cmd, params=[]):
self.cmd = cmd
self.params = params
class DbWrapper(Thread):
def __init__(self, path, nr):
Thread.__init__(self)
self.path = path
self.nr = nr
def run(self):
global qthreads
con = sqlite.connect(self.path)
cur = con.cursor()
while True:
s = sqlqueue.get()
print "Conn %d -> %s -> %s" % (self.nr, s.cmd, s.params)
if s.cmd == SqlCmd:
commitneeded = False
res = []
# s.params is a list to bundle statements into a "transaction"
for sql in s.params:
cur.execute(sql[0],sql[1])
if not sql[0].upper().startswith("SELECT"):
commitneeded = True
for row in cur.fetchall(): res.append(row)
if commitneeded: con.commit()
s.resultqueue.put(res)
else:
_threadex.acquire()
qthreads -= 1
_threadex.release()
# allow other threads to stop
sqlqueue.put(s)
s.resultqueue.put(None)
break
def execSQL(s):
if s.cmd == ConnectCmd:
global qthreads
_threadex.acquire()
qthreads += 1
_threadex.release()
wrap = DbWrapper(s.params, qthreads)
wrap.start()
elif s.cmd == StopCmd:
s.resultqueue = Queue.Queue()
sqlqueue.put(s)
# sleep until all threads are stopped
while qthreads > 0: time.sleep(0.1)
else:
s.resultqueue = Queue.Queue()
sqlqueue.put(s)
return s.resultqueue.get()
if __name__ == "__main__":
dbname = "test.db"
execSQL(DbCmd(ConnectCmd, dbname))
execSQL(DbCmd(SqlCmd, [("create table people (name_last, age integer);",())]))
# don't add connections before creating table
execSQL(DbCmd(ConnectCmd, dbname))
# insert one row
execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Smith', 80))]))
# insert two rows in one transaction
execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Jones', 55)),
("insert into people (name_last, age) values (?, ?);", ('Gruns', 25))]))
for p in execSQL(DbCmd(SqlCmd, [("select * from people", ())])): print p
execSQL(DbCmd(StopCmd))
os.remove(dbname)
|