Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
# -*- coding: utf8 -*-
__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'

# export PostgreSQL schema to text
# usable to compare databases that should be the same
#
# PostgreSQL schema info:
# http://www.alberton.info/postgresql_meta_info.html
#
#
# author: Michal Niklas

USAGE = 'usage:\n\tschema_pg.py connect_string\n\t\tconnect string:\n\t\t\thost:[port:]database:user:password\n\t\tor for ODBC:\n\t\t\tdatabase/user/passwd\n\t\tor (pyodbc)\n\t\t\tDriver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;'

import sys
import array
import exceptions

USE_JYTHON = 0

TABLES_ONLY = 0

try:
	from com.ziclix.python.sql import zxJDBC
	USE_JYTHON = 1
	USAGE = """usage:
\tschema_inf.py jdbcurl user passwd
example:
\tjython schema_py.py jdbc:postgresql://isof-test64:5434/gryfcard_mrb?stringtype=unspecified user passwd > db.schema 2> db.err
"""
except:
	USE_JYTHON = 0


DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')

OUT_FILE_ENCODING = 'UTF8'


TABLE_NAMES_SQL = """SELECT DISTINCT table_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1"""


TABLE_COLUMNS_SQL = """SELECT DISTINCT table_name, column_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2"""


TABLE_INFO_SQL = """SELECT table_name, column_name, ordinal_position, data_type, is_nullable, character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2
"""

PRIMARY_KEYS_INFO_SQL = """SELECT t.relname AS table_name, array_to_string(c.conkey, ' ') AS constraint_key
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
WHERE c.contype = 'p'
AND position('_' in t.relname ) <> 1
ORDER BY table_name;
"""


INDEXES_INFO_SQL = """SELECT relname, indisunique
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
     )
ORDER BY 1, 2
"""


INDEXES_COLUMNS_INFO1_SQL = """SELECT relname, indkey
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND pg_class.oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
)
ORDER BY 1
"""


INDEXES_COLUMNS_INFO2_SQL = """SELECT DISTINCT a.attname
     FROM pg_index c
LEFT JOIN pg_class t
       ON c.indrelid  = t.oid
LEFT JOIN pg_attribute a
       ON a.attrelid = t.oid
      AND a.attnum = ANY(indkey)
    WHERE t.relname = '%s'
      AND a.attnum = %s
"""


FOREIGN_KEYS_INFO_SQL = """SELECT t.relname AS table_name, t2.relname AS references_table
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
LEFT JOIN pg_class t2 ON c.confrelid = t2.oid
WHERE c.contype = 'f'
AND position('_' in t.relname ) <> 1
ORDER BY table_name, references_table
"""


DEFAULTS_INFO_SQL = """SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE column_default IS NOT NULL
AND position('_' in table_name) <> 1
ORDER BY table_name, column_name"""


VIEWS_INFO_SQL = """SELECT viewname, definition
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY viewname
"""


TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""


_CONN = None

TABLES = []


def init_db_conn(connect_string, username, passwd):
	"""initializes db connections, can work with PyGres or psycopg2"""
	global _CONN
	try:
		dbinfo = connect_string
		print(dbinfo)
		if USE_JYTHON:
			_CONN = zxJDBC.connect(connect_string, username, passwd, 'org.postgresql.Driver')
		elif '/' in connect_string:
			import odbc
			_CONN = odbc.odbc(connect_string)
			print(_CONN)
		elif connect_string.startswith('Driver='):
			import pyodbc
			# Driver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;
			# Driver={PostgreSQL};Server=isof-test64;Port=5435;Database=isof_stable;Uid=postgres;Pwd=postgres;
			_CONN = pyodbc.connect(connect_string)
			print(_CONN)
		else:
			# 'host:[port]:database:user:password'
			arr = connect_string.split(':')
			if len(arr) > 4:
				host = '%s:%s' % (arr[0], arr[1])
				port = int(arr[1])
				dbname = arr[2]
				user = arr[3]
				passwd = arr[4]
			elif len(arr) == 4:
				host = arr[0]
				port = -1
				dbname = arr[1]
				user = arr[2]
				passwd = arr[3]
			else:
				raise exceptions.ImportError('Incorrect connect_string!\n\n%s' % (USAGE))
			if port > 0:
				host = host.split(':')[0]
				sport = 'port=%d' % (port)
			else:
				sport = ''
			dsn = "host=%s %s dbname=%s user=%s password=%s" % (host, sport, dbname, user, passwd)
			print(dsn)
			dbinfo = 'db: %s:%s' % (host, dbname)
			use_pgdb = 0
			try:
				import psycopg2
			except:
				try:
					import pgdb
					use_pgdb = 1
				except:
					raise exceptions.ImportError('No PostgreSQL library, install psycopg2 or PyGres!')
			if not _CONN:
				print(dbinfo)
				if use_pgdb:
					_CONN = pgdb.connect(database=dbname, host=host, user=user, password=passwd)
					print(_CONN)
				else:
					_CONN = psycopg2.connect(dsn)
					print(_CONN)
	except:
		ex = sys.exc_info()
		s = 'Exception: %s: %s\n%s' % (ex[0], ex[1], dbinfo)
		print(s)
		return None
	return _CONN


def db_conn():
	"""access to global db connection"""
	return _CONN


def output_str(fout, line):
	"""outputs line to fout trying various encodings in case of encoding errors"""
	if fout:
		try:
			fout.write(line)
		except (UnicodeDecodeError, UnicodeEncodeError):
			try:
				fout.write(line.encode(OUT_FILE_ENCODING))
			except (UnicodeDecodeError, UnicodeEncodeError):
				ok = 0
				for enc in DB_ENCODINGS:
					try:
						line2 = line.decode(enc)
						#fout.write(line2.encode(OUT_FILE_ENCODING))
						fout.write(line2)
						ok = 1
						break
					except (UnicodeDecodeError, UnicodeEncodeError):
						pass
				if not ok:
					fout.write('!!! line cannot be encoded !!!\n')
					fout.write(repr(line))
		fout.write('\n')
		fout.flush()


def output_line(line, fout=None):
	"""outputs line"""
	line = line.rstrip()
	output_str(fout, line)
	output_str(sys.stdout, line)


def select_qry(querystr):
	"""return rows from SELECT"""
	if querystr:
		cur = db_conn().cursor()
		cur.execute(querystr)
		results = cur.fetchall()
		cur.close()
		return results


def fld2str(fld_v):
	"""converts field value into string"""
	if type(fld_v) == type(1.1):
		fld_v = '%s' % fld_v
		if '.' in fld_v:
			fld_v = fld_v.rstrip('0')
			fld_v = fld_v.rstrip('.')
	else:
		fld_v = '%s' % fld_v
	return fld_v


def show_qry(title, querystr, fld_join='\t', row_separator=None):
	"""prints rows from SELECT"""
	output_line('\n\n')
	output_line('--- %s ---' % title)
	rs = select_qry(querystr)
	if rs:
		for row in rs:
			output_line(fld_join.join([fld2str(s) for s in row]))
			if row_separator:
				output_line(row_separator)
	else:
		output_line(' -- NO DATA --')


def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
	"""prints rows from SELECT"""
	output_line('\n\n')
	output_line('--- %s ---' % title)
	cnt = 0
	for tbl in TABLES:
		rs = select_qry(querystr % tbl)
		if rs:
			for row in rs:
				output_line(fld_join.join([str(s) for s in row]))
				cnt += 1
				if row_separator:
					output_line(row_separator)
	if cnt == 0:
		output_line(' -- NO DATA --')


def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
	rs = select_qry(querystr % table)
	if rs:
		for row in rs:
			output_line("%s%s%s" % (table, fld_join, fld_join.join([str(s) for s in row])))
			if row_separator:
				output_line(row_separator)


def init_session():
	"""place to change db session settings like locale"""
	pass


def show_tables():
	cur = db_conn().cursor()
	cur.execute(TABLE_NAMES_SQL)
	for row in cur.fetchall():
		TABLES.append(row[0])
	cur.close()
	show_qry('tables', TABLE_NAMES_SQL)
	show_qry('table columns', TABLE_COLUMNS_SQL)
	show_qry('columns', TABLE_INFO_SQL)


def show_primary_keys():
	show_qry('primary keys', PRIMARY_KEYS_INFO_SQL)


def show_indexes():
	output_line('\n\n')
	output_line('--- %s ---' % 'indexes')
	for tbl in TABLES:
		show_qry_ex(INDEXES_INFO_SQL, tbl)
	#show_qry('indexes columns', INDEXES_COLUMNS_INFO_SQL)
	output_line('\n\n')
	output_line('--- %s ---' % 'indexes columns')
	cur = db_conn().cursor()
	for tbl in TABLES:
		#print
		#print tbl
		cur.execute(INDEXES_COLUMNS_INFO1_SQL % tbl)
		for row in cur.fetchall():
			idxname = row[0]
			idxflds = '%s' % row[1]
			#print '\n', tbl, '\t', idxname, '\t', idxflds
			for fld in idxflds.split():
				sql = INDEXES_COLUMNS_INFO2_SQL % (tbl, fld)
				cur.execute(sql)
				for row in cur.fetchall():
					output_line("%s\t%s\t%s" % (tbl, idxname, row[0]))


def show_foreign_keys():
	show_qry('foreign keys', FOREIGN_KEYS_INFO_SQL)


def show_defaults():
	show_qry('defaults', DEFAULTS_INFO_SQL)


def show_views():
	show_qry('views', VIEWS_INFO_SQL, '\n', '\n\n')


def get_arg_type(at):
	"""returns type name from type id"""
	cur = db_conn().cursor()
	cur.execute("select pg_catalog.format_type('%s', NULL)" % at)
	row = cur.fetchone()
	return row[0]


def join_arg(arg_name, arg_type, mode='i'):
	"""make string with procedure arguments"""
	if mode == 'o':
		out_s = 'OUT '
	else:
		out_s = ''
	return '%s%s %s' % (out_s, arg_name, arg_type)


def get_subfields(subfield_fld):
	"""changes function arguments names into array of strings, for example:
	   ['code', 'status_str', 'err_desctiption'],
	   various drivers return it as various types"""
	if subfield_fld:
		subfield = subfield_fld
		if isinstance(subfield_fld, array.array):
			subfield = ','.join([str(s) for s in subfield_fld])
			#print('subfield arr: >>%s<<' % (subfield_fld))
			subfield = '%s' % (subfield)
		else:
			subfield = '%s' % (subfield_fld)
		subfield = subfield.replace('{', '')
		subfield = subfield.replace('}', '')
		subfield = subfield.replace('[', '')
		subfield = subfield.replace(']', '')
		subfield = subfield.replace("'", '')
		#print('subfield: >>%s<<' % (subfield))
		arr = subfield.split(',')
		if arr and len(arr) > 0:
			return arr
	return None


def show_procedures():
	argtypes_dict = {}
	output_line('\n\n --- procedures ---')
	cur = db_conn().cursor()
	cur.execute("""SELECT DISTINCT routine_name
        FROM information_schema.routines
        WHERE specific_schema NOT IN ('pg_catalog', 'information_schema')
        ORDER BY 1""")
	rows = cur.fetchall()
	for rt_row in rows:
		funname = rt_row[0]
		cur.execute("""SELECT CASE
         WHEN pg_proc.proretset
         THEN 'setof ' || pg_catalog.format_type(pg_proc.prorettype, NULL)
         ELSE pg_catalog.format_type(pg_proc.prorettype, NULL) END,
         pg_proc.proargtypes,
         pg_proc.proargnames,
         pg_proc.prosrc,
         pg_proc.proallargtypes,
         pg_proc.proargmodes,
         pg_language.lanname
    FROM pg_catalog.pg_proc
         JOIN pg_catalog.pg_namespace
           ON (pg_proc.pronamespace = pg_namespace.oid)
         JOIN pg_catalog.pg_language
           ON (pg_proc.prolang = pg_language.oid)
   WHERE pg_proc.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
     AND (pg_proc.proargtypes[0] IS NULL
      OR pg_proc.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
     AND NOT pg_proc.proisagg
     AND pg_proc.proname = '%s'
     AND pg_namespace.nspname = 'public'
     AND pg_catalog.pg_function_is_visible(pg_proc.oid);""" % funname)
		for row in cur.fetchall():
			ret_type = row[0]
			args = ''
			argtypes = []
			argmodes = get_subfields(row[5])
			argtypes_str = '%s' % row[1]
			proc_body = '%s' % row[3]
			lang = row[6]
			argtypes_nrs = get_subfields(row[4])
			if argtypes_str:
				if not argtypes_nrs:
					argtypes_nrs = argtypes_str.split()
					argmodes = 'i' * len(argtypes_nrs)
				for at in argtypes_nrs:
					if at:
						if not at in argtypes_dict.keys():
							argtypes_dict[at] = get_arg_type(at)
						ats = argtypes_dict[at]
						argtypes.append(ats)
				argnames = get_subfields(row[2])
				if argnames:
					args = ', '.join([join_arg(a, t, m) for (a, t, m) in zip(argnames, argtypes, argmodes)])

			output_line('\n\n -- >>> %s >>> --' % funname)
			lines = proc_body.split('\n')
			output_line('CREATE FUNCTION %s(%s) RETURNS %s\nAS $$' % (funname, args, ret_type))
			was_line = 0
			for line in lines:
				line = line.rstrip()
				if line or was_line:
					was_line = 1
					output_line(line)
			output_line('$$')
			output_line('  LANGUAGE %s;' % (lang))
			output_line('\n\n -- <<< %s <<< --' % funname)
	cur.close()


def show_triggers():
	show_qry('triggers', TRIGGERS_INFO_SQL, '\n', '\n\n')


def test():
	main('127.0.0.1:music:postgres:postgres')


def main(connect_string):
	username = None
	passwd = None
	if (len(sys.argv) > 2):
		username = sys.argv[2]
	if (len(sys.argv) > 3):
		passwd = sys.argv[3]
	if not init_db_conn(connect_string, username, passwd):
		print('Something is terribly wrong with db connection')
	else:
		init_session()
		show_tables()
		if not TABLES_ONLY:
			show_primary_keys()
			show_indexes()
			show_foreign_keys()
			show_defaults()
			show_views()
			show_triggers()
			show_procedures()
		print('\n\n--- the end ---')


if '--version' in sys.argv:
	print(__version__)
elif '--test' in sys.argv:
	test()
elif '--help' in sys.argv:
	print(USAGE)
elif __name__ == '__main__':
	if '--tables_only' in sys.argv:
		TABLES_ONLY = 1
	if len(sys.argv) < 2:
		#print('arg len: %d' % (len(sys.argv)))
		print(USAGE)
	else:
		main(sys.argv[1])

Diff to Previous Revision

--- revision 6 2012-09-19 06:20:56
+++ revision 7 2014-02-21 09:17:23
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf8 -*-
-__version__ = '$Id: schema_pg.py 984 2012-03-30 11:50:32Z mn $'
+__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'
 
 # export PostgreSQL schema to text
 # usable to compare databases that should be the same
@@ -33,7 +33,9 @@
 	USE_JYTHON = 0
 
 
-OUT_ENCODINGS = ('cp1250', 'iso8859_2')
+DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')
+
+OUT_FILE_ENCODING = 'UTF8'
 
 
 TABLE_NAMES_SQL = """SELECT DISTINCT table_name
@@ -130,7 +132,7 @@
 """
 
 
-TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, condition_timing, event_manipulation, action_statement
+TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
 FROM information_schema.triggers
 WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
 ORDER BY 1, 2, 3, 4, 5"""
@@ -139,6 +141,7 @@
 _CONN = None
 
 TABLES = []
+
 
 def init_db_conn(connect_string, username, passwd):
 	"""initializes db connections, can work with PyGres or psycopg2"""
@@ -213,19 +216,37 @@
 	return _CONN
 
 
-def output_line(line):
-	try:
-		line = line.rstrip()
-		for enc in OUT_ENCODINGS:
+def output_str(fout, line):
+	"""outputs line to fout trying various encodings in case of encoding errors"""
+	if fout:
+		try:
+			fout.write(line)
+		except (UnicodeDecodeError, UnicodeEncodeError):
 			try:
-				line = line.encode(enc)
-				break
+				fout.write(line.encode(OUT_FILE_ENCODING))
 			except (UnicodeDecodeError, UnicodeEncodeError):
-				pass
-		print(line)
-	except (UnicodeDecodeError, UnicodeEncodeError):
-		print('!!! line cannot be encoded !!!')
-		pass
+				ok = 0
+				for enc in DB_ENCODINGS:
+					try:
+						line2 = line.decode(enc)
+						#fout.write(line2.encode(OUT_FILE_ENCODING))
+						fout.write(line2)
+						ok = 1
+						break
+					except (UnicodeDecodeError, UnicodeEncodeError):
+						pass
+				if not ok:
+					fout.write('!!! line cannot be encoded !!!\n')
+					fout.write(repr(line))
+		fout.write('\n')
+		fout.flush()
+
+
+def output_line(line, fout=None):
+	"""outputs line"""
+	line = line.rstrip()
+	output_str(fout, line)
+	output_str(sys.stdout, line)
 
 
 def select_qry(querystr):
@@ -238,21 +259,33 @@
 		return results
 
 
-def show_qry(title, querystr, fld_join = '\t', row_separator = None):
+def fld2str(fld_v):
+	"""converts field value into string"""
+	if type(fld_v) == type(1.1):
+		fld_v = '%s' % fld_v
+		if '.' in fld_v:
+			fld_v = fld_v.rstrip('0')
+			fld_v = fld_v.rstrip('.')
+	else:
+		fld_v = '%s' % fld_v
+	return fld_v
+
+
+def show_qry(title, querystr, fld_join='\t', row_separator=None):
 	"""prints rows from SELECT"""
 	output_line('\n\n')
 	output_line('--- %s ---' % title)
 	rs = select_qry(querystr)
 	if rs:
 		for row in rs:
-			output_line(fld_join.join([str(s) for s in row]))
+			output_line(fld_join.join([fld2str(s) for s in row]))
 			if row_separator:
 				output_line(row_separator)
 	else:
 		output_line(' -- NO DATA --')
 
 
-def show_qry_tables(title, querystr, fld_join = '\t', row_separator = None):
+def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
 	"""prints rows from SELECT"""
 	output_line('\n\n')
 	output_line('--- %s ---' % title)
@@ -269,8 +302,7 @@
 		output_line(' -- NO DATA --')
 
 
-
-def show_qry_ex(querystr, table, fld_join = '\t', row_separator = None):
+def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
 	rs = select_qry(querystr % table)
 	if rs:
 		for row in rs:
@@ -343,7 +375,7 @@
 	return row[0]
 
 
-def join_arg(arg_name, arg_type, mode = 'i'):
+def join_arg(arg_name, arg_type, mode='i'):
 	"""make string with procedure arguments"""
 	if mode == 'o':
 		out_s = 'OUT '
@@ -455,8 +487,7 @@
 	main('127.0.0.1:music:postgres:postgres')
 
 
-def main():
-	connect_string = sys.argv[1]
+def main(connect_string):
 	username = None
 	passwd = None
 	if (len(sys.argv) > 2):
@@ -492,4 +523,4 @@
 		#print('arg len: %d' % (len(sys.argv)))
 		print(USAGE)
 	else:
-		main()
+		main(sys.argv[1])

History