#!/usr/bin/env python
# -*- coding: utf8 -*-
__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'
# export PostgreSQL schema to text
# usable to compare databases that should be the same
#
# PostgreSQL schema info:
# http://www.alberton.info/postgresql_meta_info.html
#
#
# author: Michal Niklas
USAGE = 'usage:\n\tschema_pg.py connect_string\n\t\tconnect string:\n\t\t\thost:[port:]database:user:password\n\t\tor for ODBC:\n\t\t\tdatabase/user/passwd\n\t\tor (pyodbc)\n\t\t\tDriver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;'
import sys
import array
import exceptions
USE_JYTHON = 0
TABLES_ONLY = 0
try:
from com.ziclix.python.sql import zxJDBC
USE_JYTHON = 1
USAGE = """usage:
\tschema_inf.py jdbcurl user passwd
example:
\tjython schema_py.py jdbc:postgresql://isof-test64:5434/gryfcard_mrb?stringtype=unspecified user passwd > db.schema 2> db.err
"""
except:
USE_JYTHON = 0
DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')
OUT_FILE_ENCODING = 'UTF8'
TABLE_NAMES_SQL = """SELECT DISTINCT table_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1"""
TABLE_COLUMNS_SQL = """SELECT DISTINCT table_name, column_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2"""
TABLE_INFO_SQL = """SELECT table_name, column_name, ordinal_position, data_type, is_nullable, character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2
"""
PRIMARY_KEYS_INFO_SQL = """SELECT t.relname AS table_name, array_to_string(c.conkey, ' ') AS constraint_key
FROM pg_constraint c
LEFT JOIN pg_class t ON c.conrelid = t.oid
WHERE c.contype = 'p'
AND position('_' in t.relname ) <> 1
ORDER BY table_name;
"""
INDEXES_INFO_SQL = """SELECT relname, indisunique
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND oid IN (
SELECT indexrelid
FROM pg_index, pg_class
WHERE pg_class.relname='%s'
AND pg_class.oid=pg_index.indrelid
AND indisprimary != 't'
)
ORDER BY 1, 2
"""
INDEXES_COLUMNS_INFO1_SQL = """SELECT relname, indkey
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND pg_class.oid IN (
SELECT indexrelid
FROM pg_index, pg_class
WHERE pg_class.relname='%s'
AND pg_class.oid=pg_index.indrelid
AND indisprimary != 't'
)
ORDER BY 1
"""
INDEXES_COLUMNS_INFO2_SQL = """SELECT DISTINCT a.attname
FROM pg_index c
LEFT JOIN pg_class t
ON c.indrelid = t.oid
LEFT JOIN pg_attribute a
ON a.attrelid = t.oid
AND a.attnum = ANY(indkey)
WHERE t.relname = '%s'
AND a.attnum = %s
"""
FOREIGN_KEYS_INFO_SQL = """SELECT t.relname AS table_name, t2.relname AS references_table
FROM pg_constraint c
LEFT JOIN pg_class t ON c.conrelid = t.oid
LEFT JOIN pg_class t2 ON c.confrelid = t2.oid
WHERE c.contype = 'f'
AND position('_' in t.relname ) <> 1
ORDER BY table_name, references_table
"""
DEFAULTS_INFO_SQL = """SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE column_default IS NOT NULL
AND position('_' in table_name) <> 1
ORDER BY table_name, column_name"""
VIEWS_INFO_SQL = """SELECT viewname, definition
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY viewname
"""
TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""
_CONN = None
TABLES = []
def init_db_conn(connect_string, username, passwd):
"""initializes db connections, can work with PyGres or psycopg2"""
global _CONN
try:
dbinfo = connect_string
print(dbinfo)
if USE_JYTHON:
_CONN = zxJDBC.connect(connect_string, username, passwd, 'org.postgresql.Driver')
elif '/' in connect_string:
import odbc
_CONN = odbc.odbc(connect_string)
print(_CONN)
elif connect_string.startswith('Driver='):
import pyodbc
# Driver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;
# Driver={PostgreSQL};Server=isof-test64;Port=5435;Database=isof_stable;Uid=postgres;Pwd=postgres;
_CONN = pyodbc.connect(connect_string)
print(_CONN)
else:
# 'host:[port]:database:user:password'
arr = connect_string.split(':')
if len(arr) > 4:
host = '%s:%s' % (arr[0], arr[1])
port = int(arr[1])
dbname = arr[2]
user = arr[3]
passwd = arr[4]
elif len(arr) == 4:
host = arr[0]
port = -1
dbname = arr[1]
user = arr[2]
passwd = arr[3]
else:
raise exceptions.ImportError('Incorrect connect_string!\n\n%s' % (USAGE))
if port > 0:
host = host.split(':')[0]
sport = 'port=%d' % (port)
else:
sport = ''
dsn = "host=%s %s dbname=%s user=%s password=%s" % (host, sport, dbname, user, passwd)
print(dsn)
dbinfo = 'db: %s:%s' % (host, dbname)
use_pgdb = 0
try:
import psycopg2
except:
try:
import pgdb
use_pgdb = 1
except:
raise exceptions.ImportError('No PostgreSQL library, install psycopg2 or PyGres!')
if not _CONN:
print(dbinfo)
if use_pgdb:
_CONN = pgdb.connect(database=dbname, host=host, user=user, password=passwd)
print(_CONN)
else:
_CONN = psycopg2.connect(dsn)
print(_CONN)
except:
ex = sys.exc_info()
s = 'Exception: %s: %s\n%s' % (ex[0], ex[1], dbinfo)
print(s)
return None
return _CONN
def db_conn():
"""access to global db connection"""
return _CONN
def output_str(fout, line):
"""outputs line to fout trying various encodings in case of encoding errors"""
if fout:
try:
fout.write(line)
except (UnicodeDecodeError, UnicodeEncodeError):
try:
fout.write(line.encode(OUT_FILE_ENCODING))
except (UnicodeDecodeError, UnicodeEncodeError):
ok = 0
for enc in DB_ENCODINGS:
try:
line2 = line.decode(enc)
#fout.write(line2.encode(OUT_FILE_ENCODING))
fout.write(line2)
ok = 1
break
except (UnicodeDecodeError, UnicodeEncodeError):
pass
if not ok:
fout.write('!!! line cannot be encoded !!!\n')
fout.write(repr(line))
fout.write('\n')
fout.flush()
def output_line(line, fout=None):
"""outputs line"""
line = line.rstrip()
output_str(fout, line)
output_str(sys.stdout, line)
def select_qry(querystr):
"""return rows from SELECT"""
if querystr:
cur = db_conn().cursor()
cur.execute(querystr)
results = cur.fetchall()
cur.close()
return results
def fld2str(fld_v):
"""converts field value into string"""
if type(fld_v) == type(1.1):
fld_v = '%s' % fld_v
if '.' in fld_v:
fld_v = fld_v.rstrip('0')
fld_v = fld_v.rstrip('.')
else:
fld_v = '%s' % fld_v
return fld_v
def show_qry(title, querystr, fld_join='\t', row_separator=None):
"""prints rows from SELECT"""
output_line('\n\n')
output_line('--- %s ---' % title)
rs = select_qry(querystr)
if rs:
for row in rs:
output_line(fld_join.join([fld2str(s) for s in row]))
if row_separator:
output_line(row_separator)
else:
output_line(' -- NO DATA --')
def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
"""prints rows from SELECT"""
output_line('\n\n')
output_line('--- %s ---' % title)
cnt = 0
for tbl in TABLES:
rs = select_qry(querystr % tbl)
if rs:
for row in rs:
output_line(fld_join.join([str(s) for s in row]))
cnt += 1
if row_separator:
output_line(row_separator)
if cnt == 0:
output_line(' -- NO DATA --')
def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
rs = select_qry(querystr % table)
if rs:
for row in rs:
output_line("%s%s%s" % (table, fld_join, fld_join.join([str(s) for s in row])))
if row_separator:
output_line(row_separator)
def init_session():
"""place to change db session settings like locale"""
pass
def show_tables():
cur = db_conn().cursor()
cur.execute(TABLE_NAMES_SQL)
for row in cur.fetchall():
TABLES.append(row[0])
cur.close()
show_qry('tables', TABLE_NAMES_SQL)
show_qry('table columns', TABLE_COLUMNS_SQL)
show_qry('columns', TABLE_INFO_SQL)
def show_primary_keys():
show_qry('primary keys', PRIMARY_KEYS_INFO_SQL)
def show_indexes():
output_line('\n\n')
output_line('--- %s ---' % 'indexes')
for tbl in TABLES:
show_qry_ex(INDEXES_INFO_SQL, tbl)
#show_qry('indexes columns', INDEXES_COLUMNS_INFO_SQL)
output_line('\n\n')
output_line('--- %s ---' % 'indexes columns')
cur = db_conn().cursor()
for tbl in TABLES:
#print
#print tbl
cur.execute(INDEXES_COLUMNS_INFO1_SQL % tbl)
for row in cur.fetchall():
idxname = row[0]
idxflds = '%s' % row[1]
#print '\n', tbl, '\t', idxname, '\t', idxflds
for fld in idxflds.split():
sql = INDEXES_COLUMNS_INFO2_SQL % (tbl, fld)
cur.execute(sql)
for row in cur.fetchall():
output_line("%s\t%s\t%s" % (tbl, idxname, row[0]))
def show_foreign_keys():
show_qry('foreign keys', FOREIGN_KEYS_INFO_SQL)
def show_defaults():
show_qry('defaults', DEFAULTS_INFO_SQL)
def show_views():
show_qry('views', VIEWS_INFO_SQL, '\n', '\n\n')
def get_arg_type(at):
"""returns type name from type id"""
cur = db_conn().cursor()
cur.execute("select pg_catalog.format_type('%s', NULL)" % at)
row = cur.fetchone()
return row[0]
def join_arg(arg_name, arg_type, mode='i'):
"""make string with procedure arguments"""
if mode == 'o':
out_s = 'OUT '
else:
out_s = ''
return '%s%s %s' % (out_s, arg_name, arg_type)
def get_subfields(subfield_fld):
"""changes function arguments names into array of strings, for example:
['code', 'status_str', 'err_desctiption'],
various drivers return it as various types"""
if subfield_fld:
subfield = subfield_fld
if isinstance(subfield_fld, array.array):
subfield = ','.join([str(s) for s in subfield_fld])
#print('subfield arr: >>%s<<' % (subfield_fld))
subfield = '%s' % (subfield)
else:
subfield = '%s' % (subfield_fld)
subfield = subfield.replace('{', '')
subfield = subfield.replace('}', '')
subfield = subfield.replace('[', '')
subfield = subfield.replace(']', '')
subfield = subfield.replace("'", '')
#print('subfield: >>%s<<' % (subfield))
arr = subfield.split(',')
if arr and len(arr) > 0:
return arr
return None
def show_procedures():
argtypes_dict = {}
output_line('\n\n --- procedures ---')
cur = db_conn().cursor()
cur.execute("""SELECT DISTINCT routine_name
FROM information_schema.routines
WHERE specific_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1""")
rows = cur.fetchall()
for rt_row in rows:
funname = rt_row[0]
cur.execute("""SELECT CASE
WHEN pg_proc.proretset
THEN 'setof ' || pg_catalog.format_type(pg_proc.prorettype, NULL)
ELSE pg_catalog.format_type(pg_proc.prorettype, NULL) END,
pg_proc.proargtypes,
pg_proc.proargnames,
pg_proc.prosrc,
pg_proc.proallargtypes,
pg_proc.proargmodes,
pg_language.lanname
FROM pg_catalog.pg_proc
JOIN pg_catalog.pg_namespace
ON (pg_proc.pronamespace = pg_namespace.oid)
JOIN pg_catalog.pg_language
ON (pg_proc.prolang = pg_language.oid)
WHERE pg_proc.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
AND (pg_proc.proargtypes[0] IS NULL
OR pg_proc.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
AND NOT pg_proc.proisagg
AND pg_proc.proname = '%s'
AND pg_namespace.nspname = 'public'
AND pg_catalog.pg_function_is_visible(pg_proc.oid);""" % funname)
for row in cur.fetchall():
ret_type = row[0]
args = ''
argtypes = []
argmodes = get_subfields(row[5])
argtypes_str = '%s' % row[1]
proc_body = '%s' % row[3]
lang = row[6]
argtypes_nrs = get_subfields(row[4])
if argtypes_str:
if not argtypes_nrs:
argtypes_nrs = argtypes_str.split()
argmodes = 'i' * len(argtypes_nrs)
for at in argtypes_nrs:
if at:
if not at in argtypes_dict.keys():
argtypes_dict[at] = get_arg_type(at)
ats = argtypes_dict[at]
argtypes.append(ats)
argnames = get_subfields(row[2])
if argnames:
args = ', '.join([join_arg(a, t, m) for (a, t, m) in zip(argnames, argtypes, argmodes)])
output_line('\n\n -- >>> %s >>> --' % funname)
lines = proc_body.split('\n')
output_line('CREATE FUNCTION %s(%s) RETURNS %s\nAS $$' % (funname, args, ret_type))
was_line = 0
for line in lines:
line = line.rstrip()
if line or was_line:
was_line = 1
output_line(line)
output_line('$$')
output_line(' LANGUAGE %s;' % (lang))
output_line('\n\n -- <<< %s <<< --' % funname)
cur.close()
def show_triggers():
show_qry('triggers', TRIGGERS_INFO_SQL, '\n', '\n\n')
def test():
main('127.0.0.1:music:postgres:postgres')
def main(connect_string):
username = None
passwd = None
if (len(sys.argv) > 2):
username = sys.argv[2]
if (len(sys.argv) > 3):
passwd = sys.argv[3]
if not init_db_conn(connect_string, username, passwd):
print('Something is terribly wrong with db connection')
else:
init_session()
show_tables()
if not TABLES_ONLY:
show_primary_keys()
show_indexes()
show_foreign_keys()
show_defaults()
show_views()
show_triggers()
show_procedures()
print('\n\n--- the end ---')
if '--version' in sys.argv:
print(__version__)
elif '--test' in sys.argv:
test()
elif '--help' in sys.argv:
print(USAGE)
elif __name__ == '__main__':
if '--tables_only' in sys.argv:
TABLES_ONLY = 1
if len(sys.argv) < 2:
#print('arg len: %d' % (len(sys.argv)))
print(USAGE)
else:
main(sys.argv[1])
Diff to Previous Revision
--- revision 6 2012-09-19 06:20:56
+++ revision 7 2014-02-21 09:17:23
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-
-__version__ = '$Id: schema_pg.py 984 2012-03-30 11:50:32Z mn $'
+__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'
# export PostgreSQL schema to text
# usable to compare databases that should be the same
@@ -33,7 +33,9 @@
USE_JYTHON = 0
-OUT_ENCODINGS = ('cp1250', 'iso8859_2')
+DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')
+
+OUT_FILE_ENCODING = 'UTF8'
TABLE_NAMES_SQL = """SELECT DISTINCT table_name
@@ -130,7 +132,7 @@
"""
-TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, condition_timing, event_manipulation, action_statement
+TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""
@@ -139,6 +141,7 @@
_CONN = None
TABLES = []
+
def init_db_conn(connect_string, username, passwd):
"""initializes db connections, can work with PyGres or psycopg2"""
@@ -213,19 +216,37 @@
return _CONN
-def output_line(line):
- try:
- line = line.rstrip()
- for enc in OUT_ENCODINGS:
+def output_str(fout, line):
+ """outputs line to fout trying various encodings in case of encoding errors"""
+ if fout:
+ try:
+ fout.write(line)
+ except (UnicodeDecodeError, UnicodeEncodeError):
try:
- line = line.encode(enc)
- break
+ fout.write(line.encode(OUT_FILE_ENCODING))
except (UnicodeDecodeError, UnicodeEncodeError):
- pass
- print(line)
- except (UnicodeDecodeError, UnicodeEncodeError):
- print('!!! line cannot be encoded !!!')
- pass
+ ok = 0
+ for enc in DB_ENCODINGS:
+ try:
+ line2 = line.decode(enc)
+ #fout.write(line2.encode(OUT_FILE_ENCODING))
+ fout.write(line2)
+ ok = 1
+ break
+ except (UnicodeDecodeError, UnicodeEncodeError):
+ pass
+ if not ok:
+ fout.write('!!! line cannot be encoded !!!\n')
+ fout.write(repr(line))
+ fout.write('\n')
+ fout.flush()
+
+
+def output_line(line, fout=None):
+ """outputs line"""
+ line = line.rstrip()
+ output_str(fout, line)
+ output_str(sys.stdout, line)
def select_qry(querystr):
@@ -238,21 +259,33 @@
return results
-def show_qry(title, querystr, fld_join = '\t', row_separator = None):
+def fld2str(fld_v):
+ """converts field value into string"""
+ if type(fld_v) == type(1.1):
+ fld_v = '%s' % fld_v
+ if '.' in fld_v:
+ fld_v = fld_v.rstrip('0')
+ fld_v = fld_v.rstrip('.')
+ else:
+ fld_v = '%s' % fld_v
+ return fld_v
+
+
+def show_qry(title, querystr, fld_join='\t', row_separator=None):
"""prints rows from SELECT"""
output_line('\n\n')
output_line('--- %s ---' % title)
rs = select_qry(querystr)
if rs:
for row in rs:
- output_line(fld_join.join([str(s) for s in row]))
+ output_line(fld_join.join([fld2str(s) for s in row]))
if row_separator:
output_line(row_separator)
else:
output_line(' -- NO DATA --')
-def show_qry_tables(title, querystr, fld_join = '\t', row_separator = None):
+def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
"""prints rows from SELECT"""
output_line('\n\n')
output_line('--- %s ---' % title)
@@ -269,8 +302,7 @@
output_line(' -- NO DATA --')
-
-def show_qry_ex(querystr, table, fld_join = '\t', row_separator = None):
+def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
rs = select_qry(querystr % table)
if rs:
for row in rs:
@@ -343,7 +375,7 @@
return row[0]
-def join_arg(arg_name, arg_type, mode = 'i'):
+def join_arg(arg_name, arg_type, mode='i'):
"""make string with procedure arguments"""
if mode == 'o':
out_s = 'OUT '
@@ -455,8 +487,7 @@
main('127.0.0.1:music:postgres:postgres')
-def main():
- connect_string = sys.argv[1]
+def main(connect_string):
username = None
passwd = None
if (len(sys.argv) > 2):
@@ -492,4 +523,4 @@
#print('arg len: %d' % (len(sys.argv)))
print(USAGE)
else:
- main()
+ main(sys.argv[1])