Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# -*- coding: utf8 -*-
__version__
= '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'

# export PostgreSQL schema to text
# usable to compare databases that should be the same
#
# PostgreSQL schema info:
# http://www.alberton.info/postgresql_meta_info.html
#
#
# author: Michal Niklas

USAGE
= 'usage:\n\tschema_pg.py connect_string\n\t\tconnect string:\n\t\t\thost:[port:]database:user:password\n\t\tor for ODBC:\n\t\t\tdatabase/user/passwd\n\t\tor (pyodbc)\n\t\t\tDriver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;'

import sys
import array
import exceptions

USE_JYTHON
= 0

TABLES_ONLY
= 0

try:
       
from com.ziclix.python.sql import zxJDBC
        USE_JYTHON
= 1
        USAGE
= """usage:
\tschema_inf.py jdbcurl user passwd
example:
\tjython schema_py.py jdbc:postgresql://isof-test64:5434/gryfcard_mrb?stringtype=unspecified user passwd > db.schema 2> db.err
"""

except:
        USE_JYTHON
= 0


DB_ENCODINGS
= ('cp1250', 'iso8859_2', 'utf8')

OUT_FILE_ENCODING
= 'UTF8'


TABLE_NAMES_SQL
= """SELECT DISTINCT table_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1"""



TABLE_COLUMNS_SQL
= """SELECT DISTINCT table_name, column_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2"""



TABLE_INFO_SQL
= """SELECT table_name, column_name, ordinal_position, data_type, is_nullable, character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2
"""


PRIMARY_KEYS_INFO_SQL
= """SELECT t.relname AS table_name, array_to_string(c.conkey, ' ') AS constraint_key
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
WHERE c.contype = 'p'
AND position('_' in t.relname ) <> 1
ORDER BY table_name;
"""



INDEXES_INFO_SQL
= """SELECT relname, indisunique
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
     )
ORDER BY 1, 2
"""



INDEXES_COLUMNS_INFO1_SQL
= """SELECT relname, indkey
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND pg_class.oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
)
ORDER BY 1
"""



INDEXES_COLUMNS_INFO2_SQL
= """SELECT DISTINCT a.attname
     FROM pg_index c
LEFT JOIN pg_class t
       ON c.indrelid  = t.oid
LEFT JOIN pg_attribute a
       ON a.attrelid = t.oid
      AND a.attnum = ANY(indkey)
    WHERE t.relname = '%s'
      AND a.attnum = %s
"""



FOREIGN_KEYS_INFO_SQL
= """SELECT t.relname AS table_name, t2.relname AS references_table
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
LEFT JOIN pg_class t2 ON c.confrelid = t2.oid
WHERE c.contype = 'f'
AND position('_' in t.relname ) <> 1
ORDER BY table_name, references_table
"""



DEFAULTS_INFO_SQL
= """SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE column_default IS NOT NULL
AND position('_' in table_name) <> 1
ORDER BY table_name, column_name"""



VIEWS_INFO_SQL
= """SELECT viewname, definition
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY viewname
"""



TRIGGERS_INFO_SQL
= """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""



_CONN
= None

TABLES
= []


def init_db_conn(connect_string, username, passwd):
       
"""initializes db connections, can work with PyGres or psycopg2"""
       
global _CONN
       
try:
                dbinfo
= connect_string
               
print(dbinfo)
               
if USE_JYTHON:
                        _CONN
= zxJDBC.connect(connect_string, username, passwd, 'org.postgresql.Driver')
               
elif '/' in connect_string:
                       
import odbc
                        _CONN
= odbc.odbc(connect_string)
                       
print(_CONN)
               
elif connect_string.startswith('Driver='):
                       
import pyodbc
                       
# Driver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;
                       
# Driver={PostgreSQL};Server=isof-test64;Port=5435;Database=isof_stable;Uid=postgres;Pwd=postgres;
                        _CONN
= pyodbc.connect(connect_string)
                       
print(_CONN)
               
else:
                       
# 'host:[port]:database:user:password'
                        arr
= connect_string.split(':')
                       
if len(arr) > 4:
                                host
= '%s:%s' % (arr[0], arr[1])
                                port
= int(arr[1])
                                dbname
= arr[2]
                                user
= arr[3]
                                passwd
= arr[4]
                       
elif len(arr) == 4:
                                host
= arr[0]
                                port
= -1
                                dbname
= arr[1]
                                user
= arr[2]
                                passwd
= arr[3]
                       
else:
                               
raise exceptions.ImportError('Incorrect connect_string!\n\n%s' % (USAGE))
                       
if port > 0:
                                host
= host.split(':')[0]
                                sport
= 'port=%d' % (port)
                       
else:
                                sport
= ''
                        dsn
= "host=%s %s dbname=%s user=%s password=%s" % (host, sport, dbname, user, passwd)
                       
print(dsn)
                        dbinfo
= 'db: %s:%s' % (host, dbname)
                        use_pgdb
= 0
                       
try:
                               
import psycopg2
                       
except:
                               
try:
                                       
import pgdb
                                        use_pgdb
= 1
                               
except:
                                       
raise exceptions.ImportError('No PostgreSQL library, install psycopg2 or PyGres!')
                       
if not _CONN:
                               
print(dbinfo)
                               
if use_pgdb:
                                        _CONN
= pgdb.connect(database=dbname, host=host, user=user, password=passwd)
                                       
print(_CONN)
                               
else:
                                        _CONN
= psycopg2.connect(dsn)
                                       
print(_CONN)
       
except:
                ex
= sys.exc_info()
                s
= 'Exception: %s: %s\n%s' % (ex[0], ex[1], dbinfo)
               
print(s)
               
return None
       
return _CONN


def db_conn():
       
"""access to global db connection"""
       
return _CONN


def output_str(fout, line):
       
"""outputs line to fout trying various encodings in case of encoding errors"""
       
if fout:
               
try:
                        fout
.write(line)
               
except (UnicodeDecodeError, UnicodeEncodeError):
                       
try:
                                fout
.write(line.encode(OUT_FILE_ENCODING))
                       
except (UnicodeDecodeError, UnicodeEncodeError):
                                ok
= 0
                               
for enc in DB_ENCODINGS:
                                       
try:
                                                line2
= line.decode(enc)
                                               
#fout.write(line2.encode(OUT_FILE_ENCODING))
                                                fout
.write(line2)
                                                ok
= 1
                                               
break
                                       
except (UnicodeDecodeError, UnicodeEncodeError):
                                               
pass
                               
if not ok:
                                        fout
.write('!!! line cannot be encoded !!!\n')
                                        fout
.write(repr(line))
                fout
.write('\n')
                fout
.flush()


def output_line(line, fout=None):
       
"""outputs line"""
        line
= line.rstrip()
        output_str
(fout, line)
        output_str
(sys.stdout, line)


def select_qry(querystr):
       
"""return rows from SELECT"""
       
if querystr:
                cur
= db_conn().cursor()
                cur
.execute(querystr)
                results
= cur.fetchall()
                cur
.close()
               
return results


def fld2str(fld_v):
       
"""converts field value into string"""
       
if type(fld_v) == type(1.1):
                fld_v
= '%s' % fld_v
               
if '.' in fld_v:
                        fld_v
= fld_v.rstrip('0')
                        fld_v
= fld_v.rstrip('.')
       
else:
                fld_v
= '%s' % fld_v
       
return fld_v


def show_qry(title, querystr, fld_join='\t', row_separator=None):
       
"""prints rows from SELECT"""
        output_line
('\n\n')
        output_line
('--- %s ---' % title)
        rs
= select_qry(querystr)
       
if rs:
               
for row in rs:
                        output_line
(fld_join.join([fld2str(s) for s in row]))
                       
if row_separator:
                                output_line
(row_separator)
       
else:
                output_line
(' -- NO DATA --')


def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
       
"""prints rows from SELECT"""
        output_line
('\n\n')
        output_line
('--- %s ---' % title)
        cnt
= 0
       
for tbl in TABLES:
                rs
= select_qry(querystr % tbl)
               
if rs:
                       
for row in rs:
                                output_line
(fld_join.join([str(s) for s in row]))
                                cnt
+= 1
                               
if row_separator:
                                        output_line
(row_separator)
       
if cnt == 0:
                output_line
(' -- NO DATA --')


def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
        rs
= select_qry(querystr % table)
       
if rs:
               
for row in rs:
                        output_line
("%s%s%s" % (table, fld_join, fld_join.join([str(s) for s in row])))
                       
if row_separator:
                                output_line
(row_separator)


def init_session():
       
"""place to change db session settings like locale"""
       
pass


def show_tables():
        cur
= db_conn().cursor()
        cur
.execute(TABLE_NAMES_SQL)
       
for row in cur.fetchall():
                TABLES
.append(row[0])
        cur
.close()
        show_qry
('tables', TABLE_NAMES_SQL)
        show_qry
('table columns', TABLE_COLUMNS_SQL)
        show_qry
('columns', TABLE_INFO_SQL)


def show_primary_keys():
        show_qry
('primary keys', PRIMARY_KEYS_INFO_SQL)


def show_indexes():
        output_line
('\n\n')
        output_line
('--- %s ---' % 'indexes')
       
for tbl in TABLES:
                show_qry_ex
(INDEXES_INFO_SQL, tbl)
       
#show_qry('indexes columns', INDEXES_COLUMNS_INFO_SQL)
        output_line
('\n\n')
        output_line
('--- %s ---' % 'indexes columns')
        cur
= db_conn().cursor()
       
for tbl in TABLES:
               
#print
               
#print tbl
                cur
.execute(INDEXES_COLUMNS_INFO1_SQL % tbl)
               
for row in cur.fetchall():
                        idxname
= row[0]
                        idxflds
= '%s' % row[1]
                       
#print '\n', tbl, '\t', idxname, '\t', idxflds
                       
for fld in idxflds.split():
                                sql
= INDEXES_COLUMNS_INFO2_SQL % (tbl, fld)
                                cur
.execute(sql)
                               
for row in cur.fetchall():
                                        output_line
("%s\t%s\t%s" % (tbl, idxname, row[0]))


def show_foreign_keys():
        show_qry
('foreign keys', FOREIGN_KEYS_INFO_SQL)


def show_defaults():
        show_qry
('defaults', DEFAULTS_INFO_SQL)


def show_views():
        show_qry
('views', VIEWS_INFO_SQL, '\n', '\n\n')


def get_arg_type(at):
       
"""returns type name from type id"""
        cur
= db_conn().cursor()
        cur
.execute("select pg_catalog.format_type('%s', NULL)" % at)
        row
= cur.fetchone()
       
return row[0]


def join_arg(arg_name, arg_type, mode='i'):
       
"""make string with procedure arguments"""
       
if mode == 'o':
                out_s
= 'OUT '
       
else:
                out_s
= ''
       
return '%s%s %s' % (out_s, arg_name, arg_type)


def get_subfields(subfield_fld):
       
"""changes function arguments names into array of strings, for example:
           ['code', 'status_str', 'err_desctiption'],
           various drivers return it as various types"""

       
if subfield_fld:
                subfield
= subfield_fld
               
if isinstance(subfield_fld, array.array):
                        subfield
= ','.join([str(s) for s in subfield_fld])
                       
#print('subfield arr: >>%s<<' % (subfield_fld))
                        subfield
= '%s' % (subfield)
               
else:
                        subfield
= '%s' % (subfield_fld)
                subfield
= subfield.replace('{', '')
                subfield
= subfield.replace('}', '')
                subfield
= subfield.replace('[', '')
                subfield
= subfield.replace(']', '')
                subfield
= subfield.replace("'", '')
               
#print('subfield: >>%s<<' % (subfield))
                arr
= subfield.split(',')
               
if arr and len(arr) > 0:
                       
return arr
       
return None


def show_procedures():
        argtypes_dict
= {}
        output_line
('\n\n --- procedures ---')
        cur
= db_conn().cursor()
        cur
.execute("""SELECT DISTINCT routine_name
        FROM information_schema.routines
        WHERE specific_schema NOT IN ('pg_catalog', 'information_schema')
        ORDER BY 1"""
)
        rows
= cur.fetchall()
       
for rt_row in rows:
                funname
= rt_row[0]
                cur
.execute("""SELECT CASE
         WHEN pg_proc.proretset
         THEN 'setof ' || pg_catalog.format_type(pg_proc.prorettype, NULL)
         ELSE pg_catalog.format_type(pg_proc.prorettype, NULL) END,
         pg_proc.proargtypes,
         pg_proc.proargnames,
         pg_proc.prosrc,
         pg_proc.proallargtypes,
         pg_proc.proargmodes,
         pg_language.lanname
    FROM pg_catalog.pg_proc
         JOIN pg_catalog.pg_namespace
           ON (pg_proc.pronamespace = pg_namespace.oid)
         JOIN pg_catalog.pg_language
           ON (pg_proc.prolang = pg_language.oid)
   WHERE pg_proc.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
     AND (pg_proc.proargtypes[0] IS NULL
      OR pg_proc.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
     AND NOT pg_proc.proisagg
     AND pg_proc.proname = '%s'
     AND pg_namespace.nspname = 'public'
     AND pg_catalog.pg_function_is_visible(pg_proc.oid);"""
% funname)
               
for row in cur.fetchall():
                        ret_type
= row[0]
                        args
= ''
                        argtypes
= []
                        argmodes
= get_subfields(row[5])
                        argtypes_str
= '%s' % row[1]
                        proc_body
= '%s' % row[3]
                        lang
= row[6]
                        argtypes_nrs
= get_subfields(row[4])
                       
if argtypes_str:
                               
if not argtypes_nrs:
                                        argtypes_nrs
= argtypes_str.split()
                                        argmodes
= 'i' * len(argtypes_nrs)
                               
for at in argtypes_nrs:
                                       
if at:
                                               
if not at in argtypes_dict.keys():
                                                        argtypes_dict
[at] = get_arg_type(at)
                                                ats
= argtypes_dict[at]
                                                argtypes
.append(ats)
                                argnames
= get_subfields(row[2])
                               
if argnames:
                                        args
= ', '.join([join_arg(a, t, m) for (a, t, m) in zip(argnames, argtypes, argmodes)])

                        output_line
('\n\n -- >>> %s >>> --' % funname)
                        lines
= proc_body.split('\n')
                        output_line
('CREATE FUNCTION %s(%s) RETURNS %s\nAS $$' % (funname, args, ret_type))
                        was_line
= 0
                       
for line in lines:
                                line
= line.rstrip()
                               
if line or was_line:
                                        was_line
= 1
                                        output_line
(line)
                        output_line
('$$')
                        output_line
('  LANGUAGE %s;' % (lang))
                        output_line
('\n\n -- <<< %s <<< --' % funname)
        cur
.close()


def show_triggers():
        show_qry
('triggers', TRIGGERS_INFO_SQL, '\n', '\n\n')


def test():
        main
('127.0.0.1:music:postgres:postgres')


def main(connect_string):
        username
= None
        passwd
= None
       
if (len(sys.argv) > 2):
                username
= sys.argv[2]
       
if (len(sys.argv) > 3):
                passwd
= sys.argv[3]
       
if not init_db_conn(connect_string, username, passwd):
               
print('Something is terribly wrong with db connection')
       
else:
                init_session
()
                show_tables
()
               
if not TABLES_ONLY:
                        show_primary_keys
()
                        show_indexes
()
                        show_foreign_keys
()
                        show_defaults
()
                        show_views
()
                        show_triggers
()
                        show_procedures
()
               
print('\n\n--- the end ---')


if '--version' in sys.argv:
       
print(__version__)
elif '--test' in sys.argv:
        test
()
elif '--help' in sys.argv:
       
print(USAGE)
elif __name__ == '__main__':
       
if '--tables_only' in sys.argv:
                TABLES_ONLY
= 1
       
if len(sys.argv) < 2:
               
#print('arg len: %d' % (len(sys.argv)))
               
print(USAGE)
       
else:
                main
(sys.argv[1])

Diff to Previous Revision

--- revision 6 2012-09-19 06:20:56
+++ revision 7 2014-02-21 09:17:23
@@ -1,6 +1,6 @@
 
#!/usr/bin/env python
 
# -*- coding: utf8 -*-
-__version__ = '$Id: schema_pg.py 984 2012-03-30 11:50:32Z mn $'
+__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'
 
 
# export PostgreSQL schema to text
 
# usable to compare databases that should be the same
@@ -33,7 +33,9 @@
        USE_JYTHON
= 0
 
 
-OUT_ENCODINGS = ('cp1250', 'iso8859_2')
+DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')
+
+OUT_FILE_ENCODING = 'UTF8'
 
 
 TABLE_NAMES_SQL
= """SELECT DISTINCT table_name
@@ -130,7 +132,7 @@
 """

 
 
-TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, condition_timing, event_manipulation, action_statement
+TRIGGERS_INFO_SQL = """
SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
 FROM information_schema
.triggers
 WHERE trigger_schema NOT IN
('pg_catalog', 'information_schema')
 ORDER BY
1, 2, 3, 4, 5"""
@@ -139,6 +141,7 @@
 _CONN = None
 
 TABLES = []
+
 
 def init_db_conn(connect_string, username, passwd):
        """
initializes db connections, can work with PyGres or psycopg2"""
@@ -213,19 +216,37 @@
        return _CONN
 
 
-def output_line(line):
-       try:
-               line = line.rstrip()
-               for enc in OUT_ENCODINGS:
+def output_str(fout, line):
+       """
outputs line to fout trying various encodings in case of encoding errors"""
+       if fout:
+               try:
+                       fout.write(line)
+               except (UnicodeDecodeError, UnicodeEncodeError):
                        try:
-                               line = line.encode(enc)
-                               break
+                               fout.write(line.encode(OUT_FILE_ENCODING))
                        except (UnicodeDecodeError, UnicodeEncodeError):
-                               pass
-               print(line)
-       except (UnicodeDecodeError, UnicodeEncodeError):
-               print('!!! line cannot be encoded !!!')
-               pass
+                               ok = 0
+                               for enc in DB_ENCODINGS:
+                                       try:
+                                               line2 = line.decode(enc)
+                                               #fout.write(line2.encode(OUT_FILE_ENCODING))
+                                               fout.write(line2)
+                                               ok = 1
+                                               break
+                                       except (UnicodeDecodeError, UnicodeEncodeError):
+                                               pass
+                               if not ok:
+                                       fout.write('!!! line cannot be encoded !!!\n')
+                                       fout.write(repr(line))
+               fout.write('\n')
+               fout.flush()
+
+
+def output_line(line, fout=None):
+       """
outputs line"""
+       line = line.rstrip()
+       output_str(fout, line)
+       output_str(sys.stdout, line)
 
 
 def select_qry(querystr):
@@ -238,21 +259,33 @@
                return results
 
 
-def show_qry(title, querystr, fld_join = '\t', row_separator = None):
+def fld2str(fld_v):
+       """
converts field value into string"""
+       if type(fld_v) == type(1.1):
+               fld_v = '%s' % fld_v
+               if '.' in fld_v:
+                       fld_v = fld_v.rstrip('0')
+                       fld_v = fld_v.rstrip('.')
+       else:
+               fld_v = '%s' % fld_v
+       return fld_v
+
+
+def show_qry(title, querystr, fld_join='\t', row_separator=None):
        """
prints rows from SELECT"""
        output_line('\n\n')
        output_line('--- %s ---' % title)
        rs = select_qry(querystr)
        if rs:
                for row in rs:
-                       output_line(fld_join.join([str(s) for s in row]))
+                       output_line(fld_join.join([fld2str(s) for s in row]))
                        if row_separator:
                                output_line(row_separator)
        else:
                output_line(' -- NO DATA --')
 
 
-def show_qry_tables(title, querystr, fld_join = '\t', row_separator = None):
+def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
        """
prints rows from SELECT"""
        output_line('\n\n')
        output_line('--- %s ---' % title)
@@ -269,8 +302,7 @@
                output_line(' -- NO DATA --')
 
 
-
-def show_qry_ex(querystr, table, fld_join = '\t', row_separator = None):
+def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
        rs = select_qry(querystr % table)
        if rs:
                for row in rs:
@@ -343,7 +375,7 @@
        return row[0]
 
 
-def join_arg(arg_name, arg_type, mode = 'i'):
+def join_arg(arg_name, arg_type, mode='i'):
        """
make string with procedure arguments"""
        if mode == 'o':
                out_s = 'OUT '
@@ -455,8 +487,7 @@
        main('127.0.0.1:music:postgres:postgres')
 
 
-def main():
-       connect_string = sys.argv[1]
+def main(connect_string):
        username = None
        passwd = None
        if (len(sys.argv) > 2):
@@ -492,4 +523,4 @@
                #print('arg len: %d' % (len(sys.argv)))
                print(USAGE)
        else:
-               main()
+               main(sys.argv[1])

History