Welcome, guest | Sign In | My Account | Store | Cart
#dblib.py                                                                                                                                          
#created by Jorge Besada
#Last updated: 5/24/2010 - suggestion by Kosta Welke implemented
                                                                                                                                                   
import os,sys                                                                                                                                      
                                                                                                                                                   
class Connection:                                                                                                                                  
    def __init__(self,sname,uname='',password='',db='', version=''):                                                                               
        self.version = version                                                                                                                     
        self.servername = sname                                                                                                                    
        self.username = uname                                                                                                                      
        self.password = password                                                                                                                   
        self.defdb = db                                                                                                                            
        self.constr = ''                                                                                                                           
        if db == '':                                                                                                                               
            self.defdb = 'master'                                                                                                                  
        self.connected = 0                                                                                                                         
        if self.version == None or self.version == "":                                                                                             
            print "Need to pass sql version argument"                                                                                              
            return self                                                                                                                            
        if self.version == "sql2000" or self.version == "sql7":                                                                                    
            execsql = "osql"                                                                                                                       
        if self.version == "sql2005" or self.version == "sql2008":
            execsql = "sqlcmd"                                                                                                                     
        if self.version == "sybase":                                                                                                               
            execsql = "isql"                                                                                                                       
            print "Sorry, Sybase has not been implemented yet!"                                                                                    
            return self                                                                                                                            
        if uname == '':                                                                                                                            
            self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 "                                                  
        else:                                                                                                                                      
            self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 "     
                                                                                                                                                   
        #test connection:                                                                                                                          
        s = "set nocount on select name from master..syslogins where name = 'sa'"                                                                  
        lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines()                                                                            
                                                                                                                                                   
        try:                                                                                                                                       
            if lst[2].strip() == 'sa':                                                                                                             
                self.connected = 1                                                                                                                 
            else:                                                                                                                                  
                self.connected = 0                                                                                                                 
            c = Cursor()                                                                                                                           
            c.servername = sname                                                                                                                   
            c.username = uname                                                                                                                     
            c.password = password                                                                                                                  
            c.defdb = db                                                                                                                           
            c.constr = self.constr                                                                                                                 
            self.cursor = c                                                                                                                        
        except IndexError:                                                                                                                         
            print "Could not connect"                                                                                                              
                                                                                                                                                   
    def commit(self):                                                                                                                              
        "this is here for compatibility"                                                                                                           
        pass                                                                                                                                       
                                                                                                                                                   
    def close(self):                                                                                                                               
        self = None                                                                                                                                
        return self                                                                                                                                
                                                                                                                                                   
                                                                                                                                                   
class Cursor:                                                                                                                                      
    def __init__(self):                                                                                                                            
        self.defdb = ''                                                                                                                            
        self.servername = ''                                                                                                                       
        self.username = ''                                                                                                                         
        self.password = ''                                                                                                                         
        self.constr = ''                                                                                                                           
        self.rowcount = -1                                                                                                                         
        self.records = []                                                                                                                          
        self.rowid = 0                                                                                                                             
        self.sqlfile = "-Q"                                                                                                                        
        self.colseparator = chr(1) #default column separator                                                                                       
        #this is going to be a list of lists, each one with:                                                                                       
        #name, type_code, display_size, internal_size, precision, scale, null_ok                                                                   
        self.description = []                                                                                                                      
        self.fieldnames = []                                                                                                                       
        self.fieldvalues = []                                                                                                                      
        self.fieldvalue = []                                                                                                                       
        #one dictionary by column                                                                                                                  
        self.dictfield = {'name':'', 'type_code':0,'display_size':0,'internal_size':0,'precision':0, 'scale':0, 'null_ok':0}                       
        #list of lists                                                                                                                             
        self.dictfields = []                                                                                                                       
                                                                                                                                                   
    #this is for compatibility to allow both types of calls:                                                                                       
    #cursor = connection.cursor() or using cursor = connection.cursor                                                                              
    def __call__(self):                                                                                                                            
        c = Cursor()                                                                                                                               
        return c                                                                                                                                   
                                                                                                                                                   
    def execute(self, s):                                                                                                                          
        self.records = []                                                                                                                          
        lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines()                                   
        if len(lst) == 0:                                                                                                                          
            return self.rowcount                                                                                                                   
                                                                                                                                                   
        #If we get here we have results                                                                                                            
        #rowcount maybe in last line, in this form: (4 rows affected)                                                                              
        tmplastline = lst[-1]                                                                                                                      
        if tmplastline[0] == "(":  #there is a rowcount                                                                                            
            lastline = lst[-1]                                                                                                                     
            spacepos = lastline.index(" ")                                                                                                         
            cnt = lastline[1:spacepos]                                                                                                           
            self.rowcount = int(cnt)                                                                                                             
        else:                                                                                                                                      
            #last line has no recordcount, so reset it to 0                                                                                        
            self.records = lst[:]                                                                                                                  
            self.rowcount = 0                                                                                                                      
            return self.rowcount                                                                                                                   
                                                                                                                                                   
        #if we got here we may have a rowcount and the list with results                                                                           
        i = 0                                                                                                                                      
        #process metadata if we have it:                                                                                                           
        firstline = lst[0]                                                                                                                         
        lst1 = lst[0].split(self.colseparator)                                                                                                     
        self.fieldnames = []                                                                                                                       
        for x in lst1:                                                                                                                             
            x1 = x.strip()                                                                                                                         
            self.fieldnames.append(x1)  #add column name                                                                                           
        #need to make a list for each column name                                                                                                  
        self.description = []                                                                                                                      
        for x in self.fieldnames:                                                                                                                  
            l = []                                                                                                                                 
            l.append(x)                                                                                                                            
            for m in range(len(self.dictfield) - 1):                                                                                               
                l.append(0)                                                                                                                        
            l2 = tuple(l)                                                                                                                          
            self.description.append(l2)                                                                                                            
        self.description = tuple(self.description)                                                                                                 
                                                                                                                                                   
        #Data section: lst[0] is row with column names,skip                                                                                        
        #If the resulting string starts and ends with '-', discard                                                                                 
                                                                                                                                                   
        for x in lst[1:-1]:                                                                                                                        
            x0 = ''.join(x)                                                                                                                        
            x1 = x0.strip()                                                                                                                        
            #if x1 > '' and x1[0] > '-' and x1[-1] > '-':
            if x1 <> '' and x1[0] <> '-' and x1[-1] <> '-':
                self.records.append(x1)                                                                                                            
        #reset for each execution                                                                                                                  
        self.rowid = 0                                                                                                                             
        return self.rowcount                                                                                                                       
                                                                                                                                                   
    #returns one row of the result set, keeps track of the position                                                                                
    def fetchone(self):                                                                                                                            
        i = self.rowid                                                                                                                             
        j = i + 1                                                                                                                                  
        self.rowid = j                                                                                                                             
        try:                                                                                                                                       
            return tuple(self.records[i].split(self.colseparator))                                                                                 
        except IndexError:                                                                                                                         
            pass                                                                                                                                   
                                                                                                                                                   
    #returns whole recordset                                                                                                                       
    def fetchall(self):                                                                                                                            
        lst = []                                                                                                                                   
        try:                                                                                                                                       
            for x in range(self.rowid, self.rowcount):                                                                                             
                x1 = tuple(self.records[x].split(self.colseparator))                                                                               
                lst.append(x1)                                                                                                                     
        except IndexError:                                                                                                                         
            pass                                                                                                                                   
        return lst                                                                                                                                 
                                                                                                                                                   
    def close(self):                                                                                                                               
        self.records = None                                                                                                                        
        self = None                                                                                                                                
        return self                                                                                                                                
                                                                                                                                                   
#-----------------------------------------                                                                                                         
                                                                                                                                                   
#Testing harness: we create and drop logins and databases                                                                                          
#Edit connection for desired server name and security options:                                                                                     
#Sample: for local server default instance SQL2000, integrated security                                                                                                             
#   c = Connection('(local)',db='pubs', version='sql2000')                                                                                         
#For local server, SQL security                                                                                                                    
#   c = Connection('(local)','sa','sa password',db='pubs', version='sql2000')                                                                      
#These tests use a restored pubs database                                                                                          
#in a SQL2008 instance (local)\sql2008                                                                     

              
if __name__ == '__main__':                                                                                                                         
    c = Connection('(local)\sql2008',db='pubs', version='sql2008')                                                                                    
    print "Connection string: " + c.constr                                                                                                         
    if c.connected == 1:                                                                                                                           
        print "Connected OK"                                                                                                                       
    cu = c.cursor                                                                                                                                  
    lst = cu.execute('select * from authors')                                                                                                      
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    #Several SQL statements test                                                                                                                   
    lst = cu.execute("sp_addlogin 'test2', 'test2'")                                                                                               
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..syslogins where name = 'test2'")                                                                    
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("EXEC sp_droplogin 'test2'")                                                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..syslogins where name = 'test2'")                                                                    
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("CREATE DATABASE test")                                                                                                       
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("DROP DATABASE test")                                                                                                         
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()

    lst = cu.execute("update authors set au_lname = 'Whitty' where au_id = '172-32-1176'")                                                                                                         
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select au_lname from authors  where au_id = '172-32-1176'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()      
                                                                                                                                                   

Diff to Previous Revision

--- revision 10 2007-11-20 15:38:34
+++ revision 11 2010-06-04 14:31:44
@@ -1,6 +1,6 @@
 #dblib.py                                                                                                                                          
 #created by Jorge Besada
-#Last updated: 11/19/2007
+#Last updated: 5/24/2010 - suggestion by Kosta Welke implemented
                                                                                                                                                    
 import os,sys                                                                                                                                      
                                                                                                                                                    
@@ -20,7 +20,7 @@
             return self                                                                                                                            
         if self.version == "sql2000" or self.version == "sql7":                                                                                    
             execsql = "osql"                                                                                                                       
-        if self.version == "sql2005":                                                                                                              
+        if self.version == "sql2005" or self.version == "sql2008":
             execsql = "sqlcmd"                                                                                                                     
         if self.version == "sybase":                                                                                                               
             execsql = "isql"                                                                                                                       
@@ -134,7 +134,8 @@
         for x in lst[1:-1]:                                                                                                                        
             x0 = ''.join(x)                                                                                                                        
             x1 = x0.strip()                                                                                                                        
-            if x1 > '' and x1[0] > '-' and x1[-1] > '-':                                                                                           
+            #if x1 > '' and x1[0] > '-' and x1[-1] > '-':
+            if x1 <> '' and x1[0] <> '-' and x1[-1] <> '-':
                 self.records.append(x1)                                                                                                            
         #reset for each execution                                                                                                                  
         self.rowid = 0                                                                                                                             
@@ -170,17 +171,16 @@
                                                                                                                                                    
 #Testing harness: we create and drop logins and databases                                                                                          
 #Edit connection for desired server name and security options:                                                                                     
-#For local server, integrated security                                                                                                             
+#Sample: for local server default instance SQL2000, integrated security                                                                                                             
 #   c = Connection('(local)',db='pubs', version='sql2000')                                                                                         
 #For local server, SQL security                                                                                                                    
 #   c = Connection('(local)','sa','sa password',db='pubs', version='sql2000')                                                                      
-#The first part of the test uses a restored pubs database                                                                                          
-#in a SQL2005 instance (local)\sql1, the second test uses the pubs database                                                                        
-#from the default instance in the same server (local machine)                                                                        
+#These tests use a restored pubs database                                                                                          
+#in a SQL2008 instance (local)\sql2008                                                                     
 
               
 if __name__ == '__main__':                                                                                                                         
-    c = Connection('(local)\sql1',db='pubs', version='sql2005')                                                                                    
+    c = Connection('(local)\sql2008',db='pubs', version='sql2008')                                                                                    
     print "Connection string: " + c.constr                                                                                                         
     if c.connected == 1:                                                                                                                           
         print "Connected OK"                                                                                                                       
@@ -227,17 +227,14 @@
     rows = cu.fetchall()                                                                                                                           
     for x in rows:                                                                                                                                 
         print x                                                                                                                                    
-    c.close()                                                                                                                                      
-                                                                                                                                                   
-    print "\n\nRepeating test with SQL2000"                                                                                                        
-    c = Connection('(local)',db='pubs', version='sql2000')                                                                                         
-    print "Connection string: " + c.constr                                                                                                         
-    if c.connected == 1:                                                                                                                           
-        print "Connected OK"                                                                                                                       
-    cu = c.cursor                                                                                                                                  
-    lst = cu.execute('select * from authors')                                                                                                      
-    print 'rowcount=' + str(cu.rowcount)                                                                                                           
-    rows = cu.fetchall()                                                                                                                           
-    for x in rows:                                                                                                                                 
-        print x                                                                                                                                    
-    c.close()                                                                                                                                      
+    c.close()
+
+    lst = cu.execute("update authors set au_lname = 'Whitty' where au_id = '172-32-1176'")                                                                                                         
+    print 'rowcount=' + str(cu.rowcount)                                                                                                           
+    lst = cu.execute("select au_lname from authors  where au_id = '172-32-1176'")                                                                  
+    print 'rowcount=' + str(cu.rowcount)                                                                                                           
+    rows = cu.fetchall()                                                                                                                           
+    for x in rows:                                                                                                                                 
+        print x                                                                                                                                    
+    c.close()      
+                                                                                                                                                   

History