This DBI implements the Cursor and Connection objects. You can create connections, cursors, do fetchone, fetchall. It uses ADO. Will add more features later
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | #updated 8/27/2014
import win32com.client
connection = win32com.client.Dispatch(r'ADODB.Connection')
adStateOpen = 1
class Connection:
def __init__(self, servername, username='', password='', db=''):
self.connection = connection
self.version = '';
self.servername = servername;
self.username = username;
self.password = password;
self.defdb = db;
self.constr = '';
if db == '':
self.defdb = 'master'
self.connected = 0
if username == '':
self.constr = "Provider=SQLOLEDB.1;Data Source=" + self.servername + ";Trusted_Connection=yes; database=" + self.defdb
self.constr = "Provider=SQLOLEDB.1;Data Source=" + self.servername + ";uid=" + username + ";pwd=" + password + "; database=" + self.defdb
#test connection:
s = "set nocount on select name from master..syslogins where name = 'sa'"
if connection.State == adStateOpen:
self.connected = 1
c = Cursor()
c.servername = servername
c.username = username
c.password = password
c.defdb = db
c.constr = self.constr
self.cursor = c
except IndexError:
self.connected = 0
print("Could not connect")
def commit(self):
"this is here for compatibility"
def close(self):
self = None
return self
class Cursor:
def __init__(self):
self.records = []
self.rowid = 0
self.fieldnames = []
def execute(self,sql):
self.recordset = connection.execute(sql)
self.records = []
self.fieldnames = []
for x in range(self.recordset.Fields.Count):
#Need the try for not select type of sql, like updates, inserts
values_list = []
data = self.recordset.GetRows()
self.rowcount = len(data[0])
for y in range(0, self.rowcount):
for x in data:
values_list = []
self.records = tuple(self.records)
except UnboundLocalError:
def fetchall(self):
lst = []
for x in self.records:
except IndexError:
return lst
def fetchone(self):
i = self.rowid
j = i + 1
self.rowid = j
return tuple(self.records[i])
except IndexError:
if __name__ == '__main__':
c = Connection('(local)\sql2014', db='AdventureWorks2012')
print("Connection string: " + c.constr)
if c.connected == 1:
print("Connected OK")
cu = c.cursor
lst = cu.execute('select top 10 * from Person.Person')
print("list of columns:")
print('rowcount=' + str(cu.rowcount))
print('select top 10 * from Person.Person')
rows = cu.fetchall()
for x in rows:
print('Bringing records one by one')
cu.rowid = 5
rows = cu.fetchone()
rows = cu.fetchone()
print('Doing an update changing FirstName')
cu = c.cursor
FirstName = 'Kenny'
BusinessEntityID = '1'
cu.execute("update Person.Person set FirstName ='" + FirstName + "' where BusinessEntityID = " + BusinessEntityID)
print('Reading record')
lst = cu.execute('select * from Person.Person where BusinessEntityID = 1')
rows = cu.fetchall()
This is a library I created to manage SQL server databases. It works with SQL2005,SQL2008,SQL2012,SQL2014's, connectivity done with ADO. If you are a system engineer or database administrator and find yourself doing a lot of scripts and batch files connecting to MSSQL databases you will find this library useful. This is version 1.0 Comments and suggestions are appreciated. See comments in the previous versions.
Added reset of fielnames for each execution of cursor self.fieldnames = [] (it was appending to the list in successive executions)
Added reset of fieldnames for each execution of cursor self.fieldnames = [] (it was appending to the list in successive executions)