Welcome, guest | Sign In | My Account | Store | Cart
"""
Enables you to interrogate an Access database, run queries, and get 
results.
ADODB = Microsoft ActiveX Data Objects reference
ADOX = Microsoft ADO Ext
Great reference for ADODB is:
http://www.codeguru.com/cpp/data/mfc_database/ado/article.php/c4343/
Originally just an API wrapped around Douglas Savitsky's code at 
http://www.ecp.cc/pyado.html
Recordset iterator taken from excel.py in Nicolas Lehuen's code at 
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440661
Handling of field types taken from Craig Anderson's code at 
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/104801
An alternative approach might be 
http://phplens.com/lens/adodb/adodb-py-docs.htm
v1.0.5 - added ability to add a primary-foreign table relationship
v1.0.4 - added ability to delete a relationship by name
v1.0.3 - add ability to delete a named index, and to 
    close (release) a table.
v1.0.2 - added Close method to connection (recordset 
    automatically closes self already)
v1.0.1 - added DOUBLE and reordered data const mappings
"""
#To get constant values, open Access, make sure ADODB and ADOX are references, 
#  open library, and look at globals
AD_OPEN_KEYSET = 1
AD_LOCK_OPTIMISTIC = 3
AD_KEY_FOREIGN = 2
AD_RI_CASCADE = 1
INTEGER = 'integer'
SMALLINT = 'smallint'
UNSIGNEDTINYINT = 'unsignedtinyint'
CURRENCY = 'currency'
DATE = 'date'
BOOLEAN = 'boolean'
TIMESTAMP = 'timestamp'
VARCHAR = 'varchar'
LONGVARCHAR = 'longvarchar'
SINGLE = 'single'
DOUBLE = 'double'

INDEX_UNIQUE = 'unique'
INDEX_NOT_UNIQUE = 'notunique'
INDEX_PRIMARY = 'indexprimary'
INDEX_NOT_PRIMARY = "indexnotprimary"

import win32com.client
#Must run makepy once - 
#see http://www.thescripts.com/forum/thread482449.html e.g. the following 
#way - run PYTHON\Lib\site-packages\pythonwin\pythonwin.exe (replace 
#PYTHON with folder python is in).  Tools>COM Makepy utility - select 
#library named Microsoft ActiveX Data Objects 2.8 Library (2.8) and 
#select OK. Microsoft ActiveX Data Objects Recordset 2.8 Library (2.8)

class AccessDb:
    "Interface to MS Access database"
    
    def __init__(self, data_source, user, pwd="''", mdw="''"):
        """Returns a connection to the jet database
        NB use .Close() to close (NB title case unlike closing a file)"""
        self.connAccess = win32com.client.Dispatch(r'ADODB.Connection')
        """DSN syntax - http://support.microsoft.com/kb/193332 and 
        http://www.codeproject.com/database/connectionstrings.asp?
        df=100&forumid=3917&exp=0&select=1598401"""
        DSN = """PROVIDER=Microsoft.Jet.OLEDB.4.0;DATA SOURCE=%s;
            USER ID=%s;PASSWORD=%s;Jet OLEDB:System Database=%s;""" % \
            (data_source, user, pwd, mdw)
        #print DSN
        try:
            self.connAccess.Open(DSN)
        except Exception:
            raise Exception, "Unable to open MS Access database " + \
                "using DSN: %s" % DSN
    
    def getConn(self):
        "Get connection"
        return self.connAccess
    
    def closeConn(self):
        "Close connection"
        self.connAccess.Close()
    
    def getRecordset(self, SQL_statement, dict=True):
        "Get recordset"
        return Recordset(self.connAccess, SQL_statement, dict=dict)
    
    def getTableNames(self):
        "Get list of tables.  NB not system tables"
        cat = win32com.client.Dispatch(r'ADOX.Catalog')
        cat.ActiveConnection = self.connAccess
        alltables = cat.Tables
        tab_names = []
        for tab in alltables:
            if tab.Type == 'TABLE':
                tab_names.append(tab.Name)
        return tab_names
    
    def getTables(self):
        "Get dictionary of table objects - table name is the key"
        tab_names = self.getTableNames()
        tabs = {}
        for tab_name in tab_names:
            tabs[tab_name] = Table(self.connAccess, tab_name)
        return tabs
    
    def runQuery(self, SQL_statement):
        "Run SQL_statement"
        cmd = win32com.client.Dispatch(r'ADODB.Command')
        cmd.ActiveConnection = self.connAccess
        cmd.CommandText = SQL_statement
        cmd.Execute()

    def deleteIndex(self, tab_name, idx_name):
        """
        Delete index by name.
        NB cannot delete an index if a table is locked.
        Or if it is part of a relationship (I would expect).  
        Release (close) it first.
        """
        cat = win32com.client.Dispatch(r'ADOX.Catalog')
        cat.ActiveConnection = self.connAccess
        index_coll = cat.Tables(tab_name).Indexes
        try:
            index_coll.Delete(idx_name)
        except Exception, e:
            raise Exception, "Unable to delete index - if table is " + \
                "locked, make sure you release (close) it first.  " + \
                "Orig error: " + str(e)
        cat = None
    
    def addRelationship(self, tab_foreign_name, tab_foreign_key, 
                        tab_primary_name, tab_primary_key,
                        rel_name, cascade_del=False, 
                        cascade_update=False):
        """
        Add primary table-foreign table relationship        
        """
        tabs = [tab_foreign_name, tab_primary_name]
        for tab in tabs:        
            if tab not in self.getTableNames():
                raise Exception, "Table \"%s\" is not in this database" \
                    % tab
        cat = win32com.client.Dispatch(r'ADOX.Catalog')
        cat.ActiveConnection = self.connAccess
        tbl_foreign = cat.Tables(tab_foreign_name)
        new_key = win32com.client.Dispatch(r'ADOX.Key')
        try:
            new_key.Name = rel_name
            new_key.Type = AD_KEY_FOREIGN
            new_key.RelatedTable = tab_primary_name
            new_key.Columns.Append(tab_foreign_key)
            new_key.Columns(tab_foreign_key).RelatedColumn = tab_primary_key
            if cascade_del:
                new_key.DeleteRule = AD_RI_CASCADE
            if cascade_update:
                new_key.UpdateRule = AD_RI_CASCADE
            tbl_foreign.Keys.Append(new_key)
        except Exception, e:
            raise Exception, "Unable to add relationship '%s'. " % \
                rel_name + "Orig error: %s" % str(e)
        finally:
            tbl_foreign = None
            cat = None
    
    def deleteRelationship(self, tab_foreign_name, rel_name):
        """
        Delete relationship by relationship name.
        Need name of "foreign" table.
        http://msdn2.microsoft.com/en-us/library/aa164927(office.10).aspx
        """
        if tab_foreign_name not in self.getTableNames():
            raise Exception, "Table \"%s\" is not in this database" % \
                tab_foreign_name
        cat = win32com.client.Dispatch(r'ADOX.Catalog')
        cat.ActiveConnection = self.connAccess
        tbl_foreign = cat.Tables(tab_foreign_name)
        tbl_keys = [x.Name for x in tbl_foreign.Keys]
        if rel_name not in tbl_keys:
            raise Exception, "\"%s\" is not in " % rel_name + \
                "relationships for table \"%s\"" % tab_foreign_name
        tbl_foreign.Keys.Delete(rel_name)
        tbl_foreign = None
        cat = None
        

class Table():
    "MS Access table object with rs, name, and index properties"
    def __init__(self, connAccess, tab_name):
        self.connAccess = connAccess
        self.rs = win32com.client.Dispatch(r'ADODB.Recordset')
        try:
            self.rs.Open("[%s]" % tab_name, self.connAccess, AD_OPEN_KEYSET, 
                         AD_LOCK_OPTIMISTIC)
        except Exception, e:
            raise Exception, "Problem opening " + \
                "table \"%s\" - " % tab_name + \
                "orig error: %s" % str(e)
        self.name = tab_name
        self.indexes = self.__getIndexes()
    
    def getFields(self):
        "Get list of field objects"
        field_names = [field.Name for field in self.rs.Fields]
        fields = []
        for field_name in field_names:
            fields.append(Field(self.rs, field_name))        
        return fields
    
    def __getIndexes(self):
        "Get list of table indexes"
        cat = win32com.client.Dispatch(r'ADOX.Catalog')
        cat.ActiveConnection = self.connAccess
        index_coll = cat.Tables(self.name).Indexes
        indexes = []
        for index in index_coll:
            indexes.append(Index(index))
        return indexes
        cat = None
        
    def close(self):
        "Close table (releasing it)"
        self.rs.Close()
        
        
class Index():
    """MS Access index object with following properties: name, 
    index type (UNIQUE or not), primary or not, and index fields - 
    a tuple of index fields in index"""
    def __init__ (self, index):
        self.name = index.Name        
        if index.Unique:
            self.type = INDEX_UNIQUE
        else:
            self.type = INDEX_NOT_UNIQUE
        self.fields = []
        for item in index.Columns:
            self.fields.append(item.Name)
        if index.PrimaryKey:
            self.primary = INDEX_PRIMARY
        else:
            self.primary = INDEX_NOT_PRIMARY
    
class Field():
    "MS Access field object with name, type, and size properties"
    def __init__ (self, rs, field_name):
        self.name = field_name
        adofield = rs.Fields.Item(field_name)
        adotype = adofield.Type
        #http://www.devguru.com/Technologies/ado/quickref/field_type.html
        if adotype == win32com.client.constants.adInteger:
            self.type = INTEGER
        elif adotype == win32com.client.constants.adSmallInt:
            self.type = SMALLINT
        elif adotype == win32com.client.constants.adUnsignedTinyInt:
            self.type = UNSIGNEDTINYINT
        elif adotype == win32com.client.constants.adSingle:
            self.type = SINGLE
        elif adotype == win32com.client.constants.adDouble:
            self.type = DOUBLE
        elif adotype == win32com.client.constants.adCurrency:
            self.type = CURRENCY
        elif adotype == win32com.client.constants.adBoolean:
            self.type = BOOLEAN
        elif adotype == win32com.client.constants.adDate:
            self.type = DATE
        elif adotype == win32com.client.constants.adDBTimeStamp:
            self.type = TIMESTAMP
        elif adotype == win32com.client.constants.adVarWChar:
            self.type = VARCHAR
        elif adotype == win32com.client.constants.adLongVarWChar:
            self.type = LONGVARCHAR
        else:
            raise "Unrecognised ADO field type %d" % adotype
        self.size = adofield.DefinedSize

def encoding(value):
    if isinstance(value,unicode):
        value = value.strip()
        if len(value)==0:
            return None
        else:
            return value.encode("mbcs") #mbcs is a Windows, locale-specific encoding
    elif isinstance(value,str):
        value = value.strip()
        if len(value)==0:
            return None
        else:
            return value 
    else:
        return value

class Recordset():
    "MS Access recordset created from a query"
    
    def __init__ (self, connAccess, SQL_statement, dict):
        self.rs = win32com.client.Dispatch(r'ADODB.Recordset')
        self.rs.CursorLocation = 3 #uses client - makes it possible to use RecordCount property
        self.rs.Open(SQL_statement, connAccess, AD_OPEN_KEYSET, 
                     AD_LOCK_OPTIMISTIC)
        self.dict = dict
 
    def getFieldNames(self):
        "Get list of field names"
        field_names = [field.Name for field in self.rs.Fields]
        return field_names
    
    def hasRows(self):
        "Does the recordset contain any rows?"
        try:
            self.rs.MoveFirst()
        except:
            return False
        return True
    
    def getCount(self):
        """
        Get record count - NB rs.CursorLocation had to be set to 
        3 (client) to enable this
        """
        try:
            return self.rs.RecordCount
        except:
            return 0
    
    def __iter__(self):
        " Returns a paged iterator by default. See paged()."
        return self.paged()
    
    def paged(self,pagesize=128):
        """ Returns an iterator on the data contained in the sheet. Each row
        is returned as a dictionary with row headers as keys. pagesize is
        the size of the buffer of rows ; it is an implementation detail but
        could have an impact on the speed of the iterator. Use pagesize=-1
        to buffer the whole sheet in memory.
        """
        try:
            field_names = self.getFieldNames()
            #field_names = [self.encoding(field.Name) for field in recordset.Fields]
            ok = True
            while ok:
                # Thanks to Rogier Steehouder for the transposing tip 
                rows = zip(*self.rs.GetRows(pagesize))                
                if self.rs.EOF:
                    # close the recordset as soon as possible
                    self.rs.Close()
                    self.rs = None
                    ok = False
                for row in rows:
                    if self.dict:
                        yield dict(zip(field_names, map(encoding,row)))
                    else:
                        yield(map(encoding, row))                
        except:
            if self.rs is not None:
                self.rs.Close()
                del self.rs
            raise

History

  • revision 2 (16 years ago)
  • previous revisions are not available