Welcome, guest | Sign In | My Account | Store | Cart

With PostgreSQL there is:

pg_dump --schema_only

but I wanted to compare my PostgreSQL and Oracle database schema dumped by: http://code.activestate.com/recipes/576534/

Python, 526 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
#!/usr/bin/env python
# -*- coding: utf8 -*-
__version__ = '$Id: schema_pg.py 1754 2014-02-14 08:57:52Z mn $'

# export PostgreSQL schema to text
# usable to compare databases that should be the same
#
# PostgreSQL schema info:
# http://www.alberton.info/postgresql_meta_info.html
#
#
# author: Michal Niklas

USAGE = 'usage:\n\tschema_pg.py connect_string\n\t\tconnect string:\n\t\t\thost:[port:]database:user:password\n\t\tor for ODBC:\n\t\t\tdatabase/user/passwd\n\t\tor (pyodbc)\n\t\t\tDriver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;'

import sys
import array
import exceptions

USE_JYTHON = 0

TABLES_ONLY = 0

try:
	from com.ziclix.python.sql import zxJDBC
	USE_JYTHON = 1
	USAGE = """usage:
\tschema_inf.py jdbcurl user passwd
example:
\tjython schema_py.py jdbc:postgresql://isof-test64:5434/gryfcard_mrb?stringtype=unspecified user passwd > db.schema 2> db.err
"""
except:
	USE_JYTHON = 0


DB_ENCODINGS = ('cp1250', 'iso8859_2', 'utf8')

OUT_FILE_ENCODING = 'UTF8'


TABLE_NAMES_SQL = """SELECT DISTINCT table_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1"""


TABLE_COLUMNS_SQL = """SELECT DISTINCT table_name, column_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2"""


TABLE_INFO_SQL = """SELECT table_name, column_name, ordinal_position, data_type, is_nullable, character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2
"""

PRIMARY_KEYS_INFO_SQL = """SELECT t.relname AS table_name, array_to_string(c.conkey, ' ') AS constraint_key
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
WHERE c.contype = 'p'
AND position('_' in t.relname ) <> 1
ORDER BY table_name;
"""


INDEXES_INFO_SQL = """SELECT relname, indisunique
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
     )
ORDER BY 1, 2
"""


INDEXES_COLUMNS_INFO1_SQL = """SELECT relname, indkey
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND pg_class.oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
)
ORDER BY 1
"""


INDEXES_COLUMNS_INFO2_SQL = """SELECT DISTINCT a.attname
     FROM pg_index c
LEFT JOIN pg_class t
       ON c.indrelid  = t.oid
LEFT JOIN pg_attribute a
       ON a.attrelid = t.oid
      AND a.attnum = ANY(indkey)
    WHERE t.relname = '%s'
      AND a.attnum = %s
"""


FOREIGN_KEYS_INFO_SQL = """SELECT t.relname AS table_name, t2.relname AS references_table
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
LEFT JOIN pg_class t2 ON c.confrelid = t2.oid
WHERE c.contype = 'f'
AND position('_' in t.relname ) <> 1
ORDER BY table_name, references_table
"""


DEFAULTS_INFO_SQL = """SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE column_default IS NOT NULL
AND position('_' in table_name) <> 1
ORDER BY table_name, column_name"""


VIEWS_INFO_SQL = """SELECT viewname, definition
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY viewname
"""


TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, action_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""


_CONN = None

TABLES = []


def init_db_conn(connect_string, username, passwd):
	"""initializes db connections, can work with PyGres or psycopg2"""
	global _CONN
	try:
		dbinfo = connect_string
		print(dbinfo)
		if USE_JYTHON:
			_CONN = zxJDBC.connect(connect_string, username, passwd, 'org.postgresql.Driver')
		elif '/' in connect_string:
			import odbc
			_CONN = odbc.odbc(connect_string)
			print(_CONN)
		elif connect_string.startswith('Driver='):
			import pyodbc
			# Driver={PostgreSQL};Server=IP address;Port=5432;Database=myDataBase;Uid=myUsername;Pwd=myPassword;
			# Driver={PostgreSQL};Server=isof-test64;Port=5435;Database=isof_stable;Uid=postgres;Pwd=postgres;
			_CONN = pyodbc.connect(connect_string)
			print(_CONN)
		else:
			# 'host:[port]:database:user:password'
			arr = connect_string.split(':')
			if len(arr) > 4:
				host = '%s:%s' % (arr[0], arr[1])
				port = int(arr[1])
				dbname = arr[2]
				user = arr[3]
				passwd = arr[4]
			elif len(arr) == 4:
				host = arr[0]
				port = -1
				dbname = arr[1]
				user = arr[2]
				passwd = arr[3]
			else:
				raise exceptions.ImportError('Incorrect connect_string!\n\n%s' % (USAGE))
			if port > 0:
				host = host.split(':')[0]
				sport = 'port=%d' % (port)
			else:
				sport = ''
			dsn = "host=%s %s dbname=%s user=%s password=%s" % (host, sport, dbname, user, passwd)
			print(dsn)
			dbinfo = 'db: %s:%s' % (host, dbname)
			use_pgdb = 0
			try:
				import psycopg2
			except:
				try:
					import pgdb
					use_pgdb = 1
				except:
					raise exceptions.ImportError('No PostgreSQL library, install psycopg2 or PyGres!')
			if not _CONN:
				print(dbinfo)
				if use_pgdb:
					_CONN = pgdb.connect(database=dbname, host=host, user=user, password=passwd)
					print(_CONN)
				else:
					_CONN = psycopg2.connect(dsn)
					print(_CONN)
	except:
		ex = sys.exc_info()
		s = 'Exception: %s: %s\n%s' % (ex[0], ex[1], dbinfo)
		print(s)
		return None
	return _CONN


def db_conn():
	"""access to global db connection"""
	return _CONN


def output_str(fout, line):
	"""outputs line to fout trying various encodings in case of encoding errors"""
	if fout:
		try:
			fout.write(line)
		except (UnicodeDecodeError, UnicodeEncodeError):
			try:
				fout.write(line.encode(OUT_FILE_ENCODING))
			except (UnicodeDecodeError, UnicodeEncodeError):
				ok = 0
				for enc in DB_ENCODINGS:
					try:
						line2 = line.decode(enc)
						#fout.write(line2.encode(OUT_FILE_ENCODING))
						fout.write(line2)
						ok = 1
						break
					except (UnicodeDecodeError, UnicodeEncodeError):
						pass
				if not ok:
					fout.write('!!! line cannot be encoded !!!\n')
					fout.write(repr(line))
		fout.write('\n')
		fout.flush()


def output_line(line, fout=None):
	"""outputs line"""
	line = line.rstrip()
	output_str(fout, line)
	output_str(sys.stdout, line)


def select_qry(querystr):
	"""return rows from SELECT"""
	if querystr:
		cur = db_conn().cursor()
		cur.execute(querystr)
		results = cur.fetchall()
		cur.close()
		return results


def fld2str(fld_v):
	"""converts field value into string"""
	if type(fld_v) == type(1.1):
		fld_v = '%s' % fld_v
		if '.' in fld_v:
			fld_v = fld_v.rstrip('0')
			fld_v = fld_v.rstrip('.')
	else:
		fld_v = '%s' % fld_v
	return fld_v


def show_qry(title, querystr, fld_join='\t', row_separator=None):
	"""prints rows from SELECT"""
	output_line('\n\n')
	output_line('--- %s ---' % title)
	rs = select_qry(querystr)
	if rs:
		for row in rs:
			output_line(fld_join.join([fld2str(s) for s in row]))
			if row_separator:
				output_line(row_separator)
	else:
		output_line(' -- NO DATA --')


def show_qry_tables(title, querystr, fld_join='\t', row_separator=None):
	"""prints rows from SELECT"""
	output_line('\n\n')
	output_line('--- %s ---' % title)
	cnt = 0
	for tbl in TABLES:
		rs = select_qry(querystr % tbl)
		if rs:
			for row in rs:
				output_line(fld_join.join([str(s) for s in row]))
				cnt += 1
				if row_separator:
					output_line(row_separator)
	if cnt == 0:
		output_line(' -- NO DATA --')


def show_qry_ex(querystr, table, fld_join='\t', row_separator=None):
	rs = select_qry(querystr % table)
	if rs:
		for row in rs:
			output_line("%s%s%s" % (table, fld_join, fld_join.join([str(s) for s in row])))
			if row_separator:
				output_line(row_separator)


def init_session():
	"""place to change db session settings like locale"""
	pass


def show_tables():
	cur = db_conn().cursor()
	cur.execute(TABLE_NAMES_SQL)
	for row in cur.fetchall():
		TABLES.append(row[0])
	cur.close()
	show_qry('tables', TABLE_NAMES_SQL)
	show_qry('table columns', TABLE_COLUMNS_SQL)
	show_qry('columns', TABLE_INFO_SQL)


def show_primary_keys():
	show_qry('primary keys', PRIMARY_KEYS_INFO_SQL)


def show_indexes():
	output_line('\n\n')
	output_line('--- %s ---' % 'indexes')
	for tbl in TABLES:
		show_qry_ex(INDEXES_INFO_SQL, tbl)
	#show_qry('indexes columns', INDEXES_COLUMNS_INFO_SQL)
	output_line('\n\n')
	output_line('--- %s ---' % 'indexes columns')
	cur = db_conn().cursor()
	for tbl in TABLES:
		#print
		#print tbl
		cur.execute(INDEXES_COLUMNS_INFO1_SQL % tbl)
		for row in cur.fetchall():
			idxname = row[0]
			idxflds = '%s' % row[1]
			#print '\n', tbl, '\t', idxname, '\t', idxflds
			for fld in idxflds.split():
				sql = INDEXES_COLUMNS_INFO2_SQL % (tbl, fld)
				cur.execute(sql)
				for row in cur.fetchall():
					output_line("%s\t%s\t%s" % (tbl, idxname, row[0]))


def show_foreign_keys():
	show_qry('foreign keys', FOREIGN_KEYS_INFO_SQL)


def show_defaults():
	show_qry('defaults', DEFAULTS_INFO_SQL)


def show_views():
	show_qry('views', VIEWS_INFO_SQL, '\n', '\n\n')


def get_arg_type(at):
	"""returns type name from type id"""
	cur = db_conn().cursor()
	cur.execute("select pg_catalog.format_type('%s', NULL)" % at)
	row = cur.fetchone()
	return row[0]


def join_arg(arg_name, arg_type, mode='i'):
	"""make string with procedure arguments"""
	if mode == 'o':
		out_s = 'OUT '
	else:
		out_s = ''
	return '%s%s %s' % (out_s, arg_name, arg_type)


def get_subfields(subfield_fld):
	"""changes function arguments names into array of strings, for example:
	   ['code', 'status_str', 'err_desctiption'],
	   various drivers return it as various types"""
	if subfield_fld:
		subfield = subfield_fld
		if isinstance(subfield_fld, array.array):
			subfield = ','.join([str(s) for s in subfield_fld])
			#print('subfield arr: >>%s<<' % (subfield_fld))
			subfield = '%s' % (subfield)
		else:
			subfield = '%s' % (subfield_fld)
		subfield = subfield.replace('{', '')
		subfield = subfield.replace('}', '')
		subfield = subfield.replace('[', '')
		subfield = subfield.replace(']', '')
		subfield = subfield.replace("'", '')
		#print('subfield: >>%s<<' % (subfield))
		arr = subfield.split(',')
		if arr and len(arr) > 0:
			return arr
	return None


def show_procedures():
	argtypes_dict = {}
	output_line('\n\n --- procedures ---')
	cur = db_conn().cursor()
	cur.execute("""SELECT DISTINCT routine_name
        FROM information_schema.routines
        WHERE specific_schema NOT IN ('pg_catalog', 'information_schema')
        ORDER BY 1""")
	rows = cur.fetchall()
	for rt_row in rows:
		funname = rt_row[0]
		cur.execute("""SELECT CASE
         WHEN pg_proc.proretset
         THEN 'setof ' || pg_catalog.format_type(pg_proc.prorettype, NULL)
         ELSE pg_catalog.format_type(pg_proc.prorettype, NULL) END,
         pg_proc.proargtypes,
         pg_proc.proargnames,
         pg_proc.prosrc,
         pg_proc.proallargtypes,
         pg_proc.proargmodes,
         pg_language.lanname
    FROM pg_catalog.pg_proc
         JOIN pg_catalog.pg_namespace
           ON (pg_proc.pronamespace = pg_namespace.oid)
         JOIN pg_catalog.pg_language
           ON (pg_proc.prolang = pg_language.oid)
   WHERE pg_proc.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
     AND (pg_proc.proargtypes[0] IS NULL
      OR pg_proc.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
     AND NOT pg_proc.proisagg
     AND pg_proc.proname = '%s'
     AND pg_namespace.nspname = 'public'
     AND pg_catalog.pg_function_is_visible(pg_proc.oid);""" % funname)
		for row in cur.fetchall():
			ret_type = row[0]
			args = ''
			argtypes = []
			argmodes = get_subfields(row[5])
			argtypes_str = '%s' % row[1]
			proc_body = '%s' % row[3]
			lang = row[6]
			argtypes_nrs = get_subfields(row[4])
			if argtypes_str:
				if not argtypes_nrs:
					argtypes_nrs = argtypes_str.split()
					argmodes = 'i' * len(argtypes_nrs)
				for at in argtypes_nrs:
					if at:
						if not at in argtypes_dict.keys():
							argtypes_dict[at] = get_arg_type(at)
						ats = argtypes_dict[at]
						argtypes.append(ats)
				argnames = get_subfields(row[2])
				if argnames:
					args = ', '.join([join_arg(a, t, m) for (a, t, m) in zip(argnames, argtypes, argmodes)])

			output_line('\n\n -- >>> %s >>> --' % funname)
			lines = proc_body.split('\n')
			output_line('CREATE FUNCTION %s(%s) RETURNS %s\nAS $$' % (funname, args, ret_type))
			was_line = 0
			for line in lines:
				line = line.rstrip()
				if line or was_line:
					was_line = 1
					output_line(line)
			output_line('$$')
			output_line('  LANGUAGE %s;' % (lang))
			output_line('\n\n -- <<< %s <<< --' % funname)
	cur.close()


def show_triggers():
	show_qry('triggers', TRIGGERS_INFO_SQL, '\n', '\n\n')


def test():
	main('127.0.0.1:music:postgres:postgres')


def main(connect_string):
	username = None
	passwd = None
	if (len(sys.argv) > 2):
		username = sys.argv[2]
	if (len(sys.argv) > 3):
		passwd = sys.argv[3]
	if not init_db_conn(connect_string, username, passwd):
		print('Something is terribly wrong with db connection')
	else:
		init_session()
		show_tables()
		if not TABLES_ONLY:
			show_primary_keys()
			show_indexes()
			show_foreign_keys()
			show_defaults()
			show_views()
			show_triggers()
			show_procedures()
		print('\n\n--- the end ---')


if '--version' in sys.argv:
	print(__version__)
elif '--test' in sys.argv:
	test()
elif '--help' in sys.argv:
	print(USAGE)
elif __name__ == '__main__':
	if '--tables_only' in sys.argv:
		TABLES_ONLY = 1
	if len(sys.argv) < 2:
		#print('arg len: %d' % (len(sys.argv)))
		print(USAGE)
	else:
		main(sys.argv[1])

This recipe requires PyGres, psycopg2, odbc or pyodbc module to connect to PostgreSQL database. Or you can use Jython with JDBC driver.

2 comments

Michal Niklas (author) 12 years, 10 months ago  # | flag

I added similar tool for Informix: http://code.activestate.com/recipes/576621/

Michal Niklas (author) 12 years, 10 months ago  # | flag

2009-01-20: report from views, changed querying for FOREIGN KEY and PRIMARY KEY because INFORMATION_SCHEMA was very slow