Welcome, guest | Sign In | My Account | Store | Cart
import pysqlite2.dbapi2 as sqlite
import Queue, time, thread, os
from threading import Thread

_threadex = thread.allocate_lock()
qthreads = 0
sqlqueue = Queue.Queue()

ConnectCmd = "connect"
SqlCmd = "SQL"
StopCmd = "stop"
class DbCmd:
    def __init__(self, cmd, params=[]):
        self.cmd = cmd
        self.params = params

class DbWrapper(Thread):
    def __init__(self, path, nr):
        Thread.__init__(self)
        self.path = path
        self.nr = nr
    def run(self):
        global qthreads
        con = sqlite.connect(self.path)
        cur = con.cursor()
        while True:
            s = sqlqueue.get()
            print "Conn %d -> %s -> %s" % (self.nr, s.cmd, s.params)
            if s.cmd == SqlCmd:
                commitneeded = False
                res = []
#               s.params is a list to bundle statements into a "transaction"
                for sql in s.params:
                    cur.execute(sql[0],sql[1])
                    if not sql[0].upper().startswith("SELECT"): 
                        commitneeded = True
                    for row in cur.fetchall(): res.append(row)
                if commitneeded: con.commit()
                s.resultqueue.put(res)
            else:
                _threadex.acquire()
                qthreads -= 1
                _threadex.release()
#               allow other threads to stop
                sqlqueue.put(s)
                s.resultqueue.put(None)
                break

def execSQL(s):
    if s.cmd == ConnectCmd:
        global qthreads
        _threadex.acquire()
        qthreads += 1
        _threadex.release()
        wrap = DbWrapper(s.params, qthreads)
        wrap.start()
    elif s.cmd == StopCmd:
        s.resultqueue = Queue.Queue()
        sqlqueue.put(s)
#       sleep until all threads are stopped
        while qthreads > 0: time.sleep(0.1)
    else:
        s.resultqueue = Queue.Queue()
        sqlqueue.put(s)
        return s.resultqueue.get()

if __name__ == "__main__":
    dbname = "test.db"
    execSQL(DbCmd(ConnectCmd, dbname))
    execSQL(DbCmd(SqlCmd, [("create table people (name_last, age integer);",())]))
#   don't add connections before creating table
    execSQL(DbCmd(ConnectCmd, dbname))
#   insert one row
    execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Smith', 80))]))
#   insert two rows in one transaction
    execSQL(DbCmd(SqlCmd, [("insert into people (name_last, age) values (?, ?);", ('Jones', 55)), 
                           ("insert into people (name_last, age) values (?, ?);", ('Gruns', 25))]))
    for p in execSQL(DbCmd(SqlCmd, [("select * from people", ())])): print p
    execSQL(DbCmd(StopCmd))
    os.remove(dbname)

History

  • revision 3 (17 years ago)
  • previous revisions are not available