Welcome, guest | Sign In | My Account | Store | Cart

This class implement the features of retrieve dsn and tables list on win32. win32all not include this features.

Python, 151 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from ctypes import *

#Costants
SQL_FETCH_NEXT = 1

SQL_INVALID_HANDLE		= -2
SQL_SUCCESS				= 0
SQL_SUCCESS_WITH_INFO	= 1
SQL_NO_DATA_FOUND		= 100

SQL_NULL_HANDLE = 0
SQL_HANDLE_ENV = 1
SQL_HANDLE_DBC = 2
SQL_HANDLE_DESCR = 4
SQL_HANDLE_STMT = 3

SQL_ATTR_ODBC_VERSION = 200
SQL_OV_ODBC2 = 2

SQL_TABLE_NAMES = 3

SQL_C_CHAR = 1

#Custom exceptions
class OdbcInvalidHandle(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
class OdbcGenericError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)


class FetchOdbcInfo:
    def __init__(self):
        self.odbc = windll.odbc32

    def connect_engine(self):
        #Connect to the engine. Return the enviroment and a connction handle
        env_h = c_int()
        dbc_h = c_int()
        stmt_h = c_int()
        
        self.odbc.SQLAllocHandle.restype = c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, byref(env_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_ENV, env_h, ret)

        self.odbc.SQLSetEnvAttr.restype = c_short
        ret = self.odbc.SQLSetEnvAttr(env_h, SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC2, 0)
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_ENV, env_h, ret)

        self.odbc.SQLAllocHandle.restype = c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_DBC, env_h, byref(dbc_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_DBC, dbc_h, ret)

        return env_h, dbc_h
    
    def connect_odbc(self, dbc_h, dsn, user, passwd = ''):
        #Connect to odbc, return a statement handle
        stmt_h = c_int()
        sn = create_unicode_buffer(dsn)
        un = create_unicode_buffer(user)        
        pw = create_unicode_buffer(passwd)
        self.odbc.SQLConnect.restype = c_short
        ret = self.odbc.SQLConnect(dbc_h, sn, len(sn), un, len(un), pw, len(pw))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_DBC, dbc_h, ret)

        self.odbc.SQLAllocHandle.restype = c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_STMT, dbc_h, byref(stmt_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)

        return stmt_h

    def get_cols(self, stmt_h):
        #Return a list with all tables
        self.odbc.SQLTables.restype = c_short
        #We want only tables
        t_type = create_unicode_buffer('TABLE')
        ret = self.odbc.SQLTables(stmt_h, None, 0, None, 0, None, 0, byref(t_type), len(t_type))
        if not ret == SQL_SUCCESS:
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)

        TableName = create_unicode_buffer(1024)
        buff_ind = c_int()
        self.odbc.SQLBindCol.restype = c_short
        ret = self.odbc.SQLBindCol(stmt_h, SQL_TABLE_NAMES, SQL_C_CHAR, byref(TableName), \
          len(TableName), byref(buff_ind))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
            
        self.odbc.SQLFetch.restype = c_short
        table_list = []
        while 1:
            ret = self.odbc.SQLFetch(stmt_h)
            if ret == SQL_NO_DATA_FOUND:
                break
            elif not ret == SQL_SUCCESS:
                self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)
            table_list.append(TableName.value)
        return table_list

    def enum_dsn(self, env_h):
        #Return a list with [name, descrition]
        dsn = create_unicode_buffer(1024)
        desc = create_unicode_buffer(1024)
        dsn_len = c_int()
        desc_len = c_int()
        dsn_list = []
        self.odbc.SQLDataSources.restype = c_short
        while 1:
            ret = self.odbc.SQLDataSources(env_h, SQL_FETCH_NEXT, \
                dsn, len(dsn), byref(dsn_len), desc, len(desc), byref(desc_len))
            if ret == SQL_NO_DATA_FOUND:
                break
            elif not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)
            else:
                dsn_list.append((dsn.value, desc.value))
        return dsn_list

    def ctrl_err(self, ht, h, val_ret):
        #Method for make a control of the errors
        #Return a raise with a list
        state = create_unicode_buffer(5)
        NativeError = c_int()
        Message = create_unicode_buffer(1024*10)
        Buffer_len = c_int()
        err_list = []
        number_errors = 1
        self.odbc.SQLGetDiagRec.restype = c_short
        while 1:
            ret = self.odbc.SQLGetDiagRec(ht, h, number_errors, state, \
                NativeError, Message, len(Message), byref(Buffer_len))
            if ret == SQL_NO_DATA_FOUND:
                #No more data, I can raise
                raise OdbcGenericError, err_list
                break
            elif ret == SQL_INVALID_HANDLE:
                #The handle passed is an invalid handle
                raise OdbcInvalidHandle, 'SQL_INVALID_HANDLE'
            elif ret == SQL_SUCCESS:
                err_list.append((state.value, Message.value, NativeError.value))
                number_errors += 1

You can simple make a call lake this:

if __name__ == '__main__': od = FetchOdbcInfo() env_h, dbc_h = od.connect_engine() dsn = od.enum_dsn(env_h) print dsn

dsn_test = ('dsn4test')
user = ('michele')

stmt_h = od.connect_odbc(dbc_h, dsn_test, user)
cols = od.get_cols(stmt_h)
print cols

1 comment

Michele Petrazzo (author) 16 years, 6 months ago  # | flag

New version. This is the new version that work very well into my projects:

http://unipex.it/vario/RealPyOdbc.py

This can work on all the platforms where ctypes work. In the future I'll make it db-api 2.0 compliant. If you have some question, email me directly.