Welcome, guest | Sign In | My Account | Store | Cart

This recipe lets databaseconnections live in their own thread and queues are used to communicate with them.

Python, 80 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import pysqlite2.dbapi2 as sqlite
import Queue, time, thread, os
from threading import Thread

_threadex = thread.allocate_lock()
qthreads = 0
sqlqueue = Queue.Queue()

ConnectCmd = "connect"
SqlCmd = "SQL"
StopCmd = "stop"
class DbCmd:
    def __init__(self, cmd, params=[]):
        self.cmd = cmd
        self.params = params

class DbWrapper(Thread):
    def __init__(self, path, nr):
        Thread.__init__(self)
        self.path = path
        self.nr = nr
    def run(self):
        global qthreads
        con = sqlite.connect(self.path)
        cur = con.cursor()
        while True:
            s = sqlqueue.get()
            print "Conn %d -> %s -> %s" % (self.nr, s.cmd, s.params)
            if s.cmd == SqlCmd:
                commitneeded = False
                res = []
#               s.params is a list to bundle statements into a "transaction"
                for sql in s.params:
                    cur.execute(sql[0],sql[1])
                    if not sql[0].upper().startswith("SELECT"): 
                        commitneeded = True
                    for row in cur.fetchall(): res.append(row)
                if commitneeded: con.commit()
                s.resultqueue.put(res)
            else:
                _threadex.acquire()
                qthreads -= 1
                _threadex.release()
#               allow other threads to stop
                sqlqueue.put(s)
                s.resultqueue.put(None)
                break

def execSQL(s):
    if s.cmd == ConnectCmd:
        global qthreads
        _threadex.acquire()
        qthreads += 1
        _threadex.release()
        wrap = DbWrapper(s.params, qthreads)
        wrap.start()
    elif s.cmd == StopCmd:
        s.resultqueue = Queue.Queue()
        sqlqueue.put(s)
#       sleep until all threads are stopped
        while qthreads > 0: time.sleep(0.1)
    else:
        s.resultqueue = Queue.Queue()
        sqlqueue.put(s)
        return s.resultqueue.get()

if __name__ == "__main__":
    dbname = "test.db"
    execSQL(DbCmd(ConnectCmd, dbname))
    execSQL(DbCmd(SqlCmd, [("create table people (name_last, age integer);",())]))
#   don't add connections before creating table
    execSQL(DbCmd(ConnectCmd, dbname))
#   insert one row
    execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Smith', 80))]))
#   insert two rows in one transaction
    execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Jones', 55)), 
                           ("insert into people (name_last, age) values (?, ?);", ('Gruns', 25))]))
    for p in execSQL(DbCmd(SqlCmd, [("select * from people", ())])): print p
    execSQL(DbCmd(StopCmd))
    os.remove(dbname)

(py)SQLite restricts use of a connection to the database to the thread that it was created in. This recipe facilitates use of (py)SQLite from webapplications (like cherrypy), without the need to connect to the database for every SQL-command.