ActiveState Code

Recipe 576557: Dump PostgreSQL db schema to text


With PostgreSQL there is:

pg_dump --schema_only

but I wanted to compare my PostgreSQL and Oracle database schema dumped by: http://code.activestate.com/recipes/576534/

Python
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
#!/usr/bin/python
# -*- coding: cp1250 -*-
__version__ = '$Id: schema_pg.py 559 2009-01-20 08:00:01Z mn $'

# export PostgreSQL schema to text
# usable to compare databases that should be the same
#
# PostgreSQL schema info:
# http://www.alberton.info/postgresql_meta_info.html
#
#
# author: Michal Niklas

USAGE = 'usage:\n\tschema_pg.py connect_string\n\t\tconnect string: host:database:user:password:opt:tty'

import sys
import exceptions

TABLE_NAMES_SQL = """SELECT DISTINCT table_name
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1"""

TABLE_INFO_SQL = """SELECT table_name, column_name, ordinal_position, data_type, is_nullable, character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema='public'
AND position('_' in table_name) <> 1
ORDER BY 1, 2
"""

PRIMARY_KEYS_INFO_SQL = """SELECT t.relname AS table_name, array_to_string(c.conkey, ' ') AS constraint_key
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
WHERE c.contype = 'p'
AND position('_' in t.relname ) <> 1
ORDER BY table_name;
"""


INDEXES_INFO_SQL = """SELECT relname, indisunique
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
     )
ORDER BY 1, 2
"""


INDEXES_COLUMNS_INFO1_SQL = """SELECT relname, indkey
FROM pg_class, pg_index
WHERE pg_class.oid = pg_index.indexrelid
AND pg_class.oid IN (
    SELECT indexrelid
      FROM pg_index, pg_class
     WHERE pg_class.relname='%s'
       AND pg_class.oid=pg_index.indrelid
       AND indisprimary != 't'
)
ORDER BY 1
"""


INDEXES_COLUMNS_INFO2_SQL = """SELECT DISTINCT a.attname
     FROM pg_index c
LEFT JOIN pg_class t
       ON c.indrelid  = t.oid
LEFT JOIN pg_attribute a
       ON a.attrelid = t.oid
      AND a.attnum = ANY(indkey)
    WHERE t.relname = '%s'
      AND a.attnum = %s
"""


FOREIGN_KEYS_INFO_SQL = """SELECT t.relname AS table_name, t2.relname AS references_table
FROM pg_constraint c
LEFT JOIN pg_class t  ON c.conrelid  = t.oid
LEFT JOIN pg_class t2 ON c.confrelid = t2.oid
WHERE c.contype = 'f'
AND position('_' in t.relname ) <> 1
ORDER BY table_name, references_table
"""


DEFAULTS_INFO_SQL = """SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE column_default IS NOT NULL
AND position('_' in table_name) <> 1
ORDER BY table_name, column_name"""


VIEWS_INFO_SQL = """SELECT viewname, definition
FROM pg_views
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY viewname
"""


TRIGGERS_INFO_SQL = """SELECT event_object_table, trigger_name, action_orientation, condition_timing, event_manipulation, action_statement
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY 1, 2, 3, 4, 5"""


_CONN = None

TABLES = []

def init_db_conn(connect_string):
	"""initializes db connections, can work with PyGres or psycopg2"""
	global _CONN
	# 'host:database:user:password:opt:tty'
	connect_string += '::::'
	(host, dbname, user, password, _) = connect_string.split(':', 4)
	dsn = "host=%s dbname=%s user=%s password=%s" % (host, dbname, user, password)
	dbinfo = 'db: %s:%s' % (host, dbname)
	try:
		use_pgdb = 0
		try:
			import pgdb
			use_pgdb = 1
		except:
			try:
				import psycopg2
			except:
				raise exceptions.ImportError('No PostgreSQL library, install psycopg2 or PyGres!')
		if not _CONN:
			print dbinfo
			if use_pgdb:
				_CONN = pgdb.connect(connect_string)
			else:
				_CONN = psycopg2.connect(dsn)
	except:
		ex = sys.exc_info()
		s = 'Exception: %s: %s\n%s' % (ex[0], ex[1], dbinfo)
		print s
		return None
	return _CONN


def db_conn():
	"""access to global db connection"""
	return _CONN


def select_qry(querystr):
	"""return rows from SELECT"""
	if querystr:
		cur = db_conn().cursor()
		cur.execute(querystr)
		results = cur.fetchall()
		cur.close()
		return results


def show_qry(title, querystr, fld_join = '\t', row_separator = None):
	"""prints rows from SELECT"""
	print '\n\n'
	print '--- %s ---' % title
	rs = select_qry(querystr)
	if rs:
		for row in rs:
			print fld_join.join([str(s) for s in row])
			if row_separator:
				print row_separator
	else:
		print ' -- NO DATA --'


def show_qry_tables(title, querystr, fld_join = '\t', row_separator = None):
	"""prints rows from SELECT"""
	print '\n\n'
	print '--- %s ---' % title
	cnt = 0
	for tbl in TABLES:
		rs = select_qry(querystr % tbl)
		if rs:
			for row in rs:
				print fld_join.join([str(s) for s in row])
				cnt += 1
				if row_separator:
					print row_separator
	if cnt == 0:
		print ' -- NO DATA --'



def show_qry_ex(querystr, table, fld_join = '\t', row_separator = None):
	rs = select_qry(querystr % table)
	if rs:
		for row in rs:
			print "%s%s%s" % (table, fld_join, fld_join.join([str(s) for s in row]))
			if row_separator:
				print row_separator


def init_session():
	"""place to change db session settings like locale"""
	pass


def show_tables():
	cur = db_conn().cursor()
	cur.execute(TABLE_NAMES_SQL)
	for row in cur.fetchall():
		TABLES.append(row[0])
	cur.close()
	show_qry('tables', TABLE_NAMES_SQL)
	show_qry('columns', TABLE_INFO_SQL)


def show_primary_keys():
	show_qry('primary keys', PRIMARY_KEYS_INFO_SQL)


def show_indexes():
	print '\n\n'
	print '--- %s ---' % 'indexes'
	for tbl in TABLES:
		show_qry_ex(INDEXES_INFO_SQL, tbl)
	#show_qry('indexes columns', INDEXES_COLUMNS_INFO_SQL)
	print '\n\n'
	print '--- %s ---' % 'indexes columns'
	cur = db_conn().cursor()
	for tbl in TABLES:
		#print
		#print tbl
		cur.execute(INDEXES_COLUMNS_INFO1_SQL % tbl)
		for row in cur.fetchall():
			idxname = row[0]
			idxflds = row[1]
			#print '\n', tbl, '\t', idxname, '\t', idxflds
			for fld in idxflds.split():
				sql = INDEXES_COLUMNS_INFO2_SQL % (tbl, fld)
				cur.execute(sql)
				for row in cur.fetchall():
					print "%s\t%s\t%s" % (tbl, idxname, row[0])


def show_foreign_keys():
	show_qry('foreign keys', FOREIGN_KEYS_INFO_SQL)


def show_defaults():
	show_qry('defaults', DEFAULTS_INFO_SQL)


def show_views():
	show_qry('views', VIEWS_INFO_SQL, '\n', '\n\n')


def get_arg_type(at):
	"""returns type name from type id"""
	cur = db_conn().cursor()
	cur.execute("select pg_catalog.format_type('%s', NULL)" % at)
	row = cur.fetchone()
	return row[0]


def join_arg(arg_name, arg_type, mode = 'i'):
	"""make string with procedure arguments"""
	if mode == 'o':
		out_s = 'OUT '
	else:
		out_s = ''
	return '%s%s %s' % (out_s, arg_name, arg_type)


def get_subfields(subfield):
	"""changes '{23, 24, 23}' to ('23', '24', '23')"""
	if subfield:
		subfield = subfield.replace('{', '')
		subfield = subfield.replace('}', '')
		return subfield.split(',')
	return None


def show_procedures():
	argtypes_dict = {}
	print '\n\n --- procedures ---'
	cur = db_conn().cursor()
	cur.execute("""SELECT DISTINCT routine_name
        FROM information_schema.routines
        WHERE specific_schema NOT IN ('pg_catalog', 'information_schema')
        ORDER BY 1""")
	rows = cur.fetchall()
	for rt_row in rows:
		funname = rt_row[0]
		cur.execute("""SELECT CASE
         WHEN pg_proc.proretset
         THEN 'setof ' || pg_catalog.format_type(pg_proc.prorettype, NULL)
         ELSE pg_catalog.format_type(pg_proc.prorettype, NULL) END,
         pg_proc.proargtypes,
         pg_proc.proargnames,
         pg_proc.prosrc,
         pg_proc.proallargtypes,
         pg_proc.proargmodes,
         pg_language.lanname
    FROM pg_catalog.pg_proc
         JOIN pg_catalog.pg_namespace
           ON (pg_proc.pronamespace = pg_namespace.oid)
         JOIN pg_catalog.pg_language
           ON (pg_proc.prolang = pg_language.oid)
   WHERE pg_proc.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
     AND (pg_proc.proargtypes[0] IS NULL
      OR pg_proc.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
     AND NOT pg_proc.proisagg
     AND pg_proc.proname = '%s'
     AND pg_namespace.nspname = 'public'
     AND pg_catalog.pg_function_is_visible(pg_proc.oid);""" % funname)
		for row in cur.fetchall():
			ret_type = row[0]
			args = ''
			argtypes = []
			argmodes = get_subfields(row[5])
			argtypes_str = row[1]
			proc_body = row[3]
			lang = row[6]
			argtypes_nrs = get_subfields(row[4])
			if argtypes_str:
				if not argtypes_nrs:
					argtypes_nrs = argtypes_str.split()
					argmodes = 'i' * len(argtypes_nrs)
				for at in argtypes_nrs:
					if not at in argtypes_dict.keys():
						argtypes_dict[at] = get_arg_type(at)
					ats = argtypes_dict[at]
					argtypes.append(ats)
				argnames = get_subfields(row[2])
				if argnames:
					args = ', '.join([join_arg(a, t, m) for (a, t, m) in zip(argnames, argtypes, argmodes)])

			print '\n\n -- >>> %s >>> --' % funname
			lines = proc_body.split('\n')
			print 'CREATE FUNCTION %s(%s) RETURNS %s\nAS $$' % (funname, args, ret_type)
			was_line = 0
			for line in lines:
				line = line.rstrip()
				if line or was_line:
					was_line = 1
					print line
			print '$$'
			print '  LANGUAGE %s;' % (lang)
			print '\n\n -- <<< %s <<< --' % funname
	cur.close()


def show_triggers():
	show_qry('triggers', TRIGGERS_INFO_SQL, '\n', '\n\n')


def test():
	main('127.0.0.1:music:postgres:postgres')


def main(connect_string):
	if not init_db_conn(connect_string):
		print 'Something is terribly wrong with db connection'
	else:
		init_session()
		show_tables()
		show_primary_keys()
		show_indexes()
		show_foreign_keys()
		show_defaults()
		show_views()
		show_triggers()
		show_procedures()


if '--version' in sys.argv:
	print __version__
elif '--test' in sys.argv:
	test()
elif __name__ == '__main__':
	if len(sys.argv) != 2:
		print USAGE
	else:
		main(sys.argv[1])

Discussion

This recipe requires PyGres or psycopg2 module to connect to PostgreSQL database.

Comments

  1. 1. At 2:33 a.m. on 19 jan 2009, Michal Niklas (the author) said:

    I added similar tool for Informix: http://code.activestate.com/recipes/576621/

  2. 2. At 9:59 p.m. on 20 jan 2009, Michal Niklas (the author) said:

    2009-01-20: report from views, changed querying for FOREIGN KEY and PRIMARY KEY because INFORMATION_SCHEMA was very slow

Sign in to comment