Welcome, guest | Sign In | My Account | Store | Cart
# dblib3.py
#Created by Jorge Besada, July 2014
#Same program as the Python Database Interface for MS SQL Server (for Python 2.x)
#converted to Python 3

import os
import sys


class Connection:
    def __init__(self, servername, username='', password='', db='', version=''):
        self.version = version;
        self.servername = servername;
        self.username = username;
        self.password = password;
        self.defdb = db;
        self.constr = '';
        if db == '':
            self.defdb = 'master'
        self.connected = 0
        if self.version == None or self.version == "":
            print("Need to pass sql version argument")
            return self
        if self.version == "sql2000" or self.version == "sql7":
            execsql = "osql"
        if self.version in ("sql2005", "sql2008", "sql2012", "sql2014"):
            execsql = "sqlcmd"
        if self.version == "sybase":
            execsql = "isql"
            print("Sorry, Sybase has not been implemented yet!")
            return self
        if username == '':
            self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 "
        else:
            self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 "

            #test connection:
        s = "set nocount on select name from master..syslogins where name = 'sa'"
        lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines()

        try:
            if lst[2].strip() == 'sa':
                self.connected = 1
            else:
                self.connected = 0
            c = Cursor()
            c.servername = servername
            c.username = username
            c.password = password
            c.defdb = db
            c.constr = self.constr
            self.cursor = c
        except IndexError:
            print("Could not connect")

    def commit(self):
        "this is here for compatibility"
        pass

    def close(self):
        self = None
        return self


class Cursor:
    def __init__(self):
        self.defdb = ''
        self.servername = ''
        self.username = ''
        self.password = ''
        self.constr = ''
        self.rowcount = -1
        self.records = []
        self.rowid = 0
        self.sqlfile = "-Q"
        self.colseparator = chr(1)  #default column separator
        #this is going to be a list of lists, each one with:                                                                                       
        #name, type_code, display_size, internal_size, precision, scale, null_ok                                                                   
        self.description = []
        self.fieldnames = []
        self.fieldvalues = []
        self.fieldvalue = []
        #one dictionary by column                                                                                                                  
        self.dictfield = {'name': '', 'type_code': 0, 'display_size': 0, 'internal_size': 0, 'precision': 0, 'scale': 0,
                          'null_ok': 0}
        #list of lists                                                                                                                             
        self.dictfields = []

        #this is for compatibility to allow both types of calls:

    #cursor = connection.cursor() or using cursor = connection.cursor
    def __call__(self):
        c = Cursor()
        return c

    def execute(self, s):
        self.records = []
        lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines()
        if len(lst) == 0:
            return self.rowcount

        #If we get here we have results
        #rowcount maybe in last line, in this form: (4 rows affected)                                                                              
        tmplastline = lst[-1]
        if tmplastline[
            0] == "(":  #there is a rowcount
            lastline = lst[-1]
            spacepos = lastline.index(" ")
            cnt = lastline[1:spacepos]
            self.rowcount = int(cnt)
        else:
            #last line has no recordcount, so reset it to 0                                                                                        
            self.records = lst[:]
            self.rowcount = 0
            return self.rowcount

            #if we got here we may have a rowcount and the list with results
        i = 0
        #process metadata if we have it:                                                                                                           
        firstline = lst[0]
        lst1 = lst[0].split(self.colseparator)
        self.fieldnames = []
        for x in lst1:
            x1 = x.strip()
            self.fieldnames.append(
                x1)  #add column name
        #need to make a list for each column name                                                                                                  
        self.description = []
        for x in self.fieldnames:
            l = []
            l.append(x)
            for m in range(len(self.dictfield) - 1):
                l.append(0)
            l2 = tuple(l)
            self.description.append(l2)
        self.description = tuple(self.description)

        #Data section: lst[0] is row with column names,skip                                                                                        
        #If the resulting string starts and ends with '-', discard                                                                                 

        for x in lst[1:-1]:
            x0 = ''.join(x)
            x1 = x0.strip()
            if x1 > '' and x1[0] > '-' and x1[-1] > '-':
                #if x1 <> '' and x1[0] <> '-' and x1[-1] <> '-':
                self.records.append(x1)
                #reset for each execution
        self.rowid = 0
        return self.rowcount

        #returns one row of the result set, keeps track of the position

    def fetchone(self):
        i = self.rowid
        j = i + 1
        self.rowid = j
        try:
            return tuple(self.records[i].split(self.colseparator))
        except IndexError:
            pass

            #returns whole recordset

    def fetchall(self):
        lst = []
        try:
            for x in range(self.rowid, self.rowcount):
                x1 = tuple(self.records[x].split(self.colseparator))
                lst.append(x1)
        except IndexError:
            pass
        return lst

    def close(self):
        self.records = None
        self = None
        return self

    #-----------------------------------------

#Testing harness: we create and drop logins and databases                                                                                          
#Edit connection for desired server name and security options:                                                                                     
#Sample: for local server default instance SQL2000, integrated security                                                                                                             
#   c = Connection('(local)',db='pubs', version='sql2000')                                                                                         
#For local server, SQL security                                                                                                                    
#   c = Connection('(local)','sa','sa password',db='pubs', version='sql2000')                                                                      
#These tests use a restored AdventureWorks2012 database
#in a SQL2014 instance: (local)\sql2014


if __name__ == '__main__':
    c = Connection('(local)\sql2014', db='AdventureWorks2012', version='sql2014')
    print("Connection string: " + c.constr)
    if c.connected == 1:
        print("Connected OK")
    cu = c.cursor
    lst = cu.execute('select top 10  * from Person.Person')
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()


    #Several SQL statements test                                                                                                                   
    lst = cu.execute("sp_addlogin 'test2', 'slslW$lllldQQmm!!'")
    print('rowcount=' + str(cu.rowcount))
    lst = cu.execute("select name from master..syslogins where name = 'test2'")
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()


    lst = cu.execute("EXEC sp_droplogin 'test2'")
    print('rowcount=' + str(cu.rowcount))
    lst = cu.execute("select name from master..syslogins where name = 'test2'")
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()


    lst = cu.execute("CREATE DATABASE test")
    print('rowcount=' + str(cu.rowcount))
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()

    lst = cu.execute("DROP DATABASE test")
    print('rowcount=' + str(cu.rowcount))
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()

    lst = cu.execute("update Person.Person set FirstName = 'Kenneth' where LastName = 'Sanchez' and MiddleName = 'J'")
    print('rowcount=' + str(cu.rowcount))
    lst = cu.execute("select FirstName, MiddleName, LastName from Person.Person  where LastName = 'Sanchez' and MiddleName = 'J'")
    print('rowcount=' + str(cu.rowcount))
    rows = cu.fetchall()
    for x in rows:
        print(x)
    c.close()      
                                                                                                                                                   

History