An example implementation of a Nagios script in Python for monitoring database servers via ODBC queries. The example tests contained are for checking the status of MS SQL Server replication and log shipping, but any status check that can be performed by a query can be implemented. This method is not considered a replacement for SNMP monitoring, but to implement custom logic checks.
New tests are implemented by adding an @nagios_test decorator, and called by the -t parameter with the function name. Usage text list available tests.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | #!/usr/bin/env python26
# - Matt Keranen 2011 (
# FreeTDS: ./configure --with-tdsver=8.0 --enable-msdblib
# /etc/odbcinst.ini:
# [FreeTDS]
# Description = TDS driver for MSSQL
# Driver = /usr/local/lib/
import getopt, platform, pyodbc, string, sys
nagios_codes = dict(OK=0, WARNING=1, CRITICAL=2, UNKNOWN=3, DEPENDENT=4)
def usage():
nagios_return('UNKNOWN', 'usage: %s -h host -t test\n%s'% (format(sys.argv[0]), test_list()))
def nagios_return(code, response):
print(code + ": " + response)
def execute_sql(host, sql, database='master'):
"""Execute SQL against specified database"""
if platform.system() == 'Darwin': driver = 'SQL Server'
else: driver = 'FreeTDS'
cs = r'DRIVER={%s};SERVER=%s;DATABASE=%s;UID=nagios;PWD=N^gm3;' % (driver,host,database)
try: cnx = pyodbc.connect(cs)
except pyodbc.Error as e: return {'code':'CRITICAL', 'msg': "Unable to connect to SQL host %s\n%s" % (host, string.join(e,'\n'))}
cur = cnx.cursor()
try: rows = cur.execute(sql).fetchall()
except pyodbc.Error as e:
return {'code':"CRITICAL", 'msg': "Unable to execute SQL query on host %s\n%s" % (host, string.join(e,'\n'))}
return rows
def get_func(test):
"""Determine if function name is valid and defined as a Nagios test"""
try: func = getattr(sys.modules[__name__], test)
except AttributeError as e:
nagios_return ('UNKNOWN', 'Invalid test name %s' % test)
try: test = func.is_test
except: nagios_return ('UNKNOWN', '%s not defined as test' % test)
if test: return func
else: nagios_return ('UNKNOWN', '%s not defined as test' % test)
def test_list():
"""List of valid test names"""
tests = 'tests:\n'
for func in dir(sys.modules[__name__]):
test = getattr(sys.modules[__name__], func)
try: test.is_test
except: pass
else: tests += ' %s\n' % func
return tests
def nagios_test(func):
func.is_test = True
return func
def sql_ping(host):
"""Connect to SQL Server instance and return version string"""
rows = execute_sql(host, sql)
if type(rows) is dict: return rows
message = rows[0][0]
return {'code':'OK', 'msg': message}
def db_state(host):
"""Check state of each database"""
crit = warn = 0
msg = ''
sql = 'SELECT [name] db, user_access, user_access_desc, state, state_desc FROM sys.databases WHERE user_access > 0 OR state > 0'
rows = execute_sql(host, sql)
if type(rows) is dict: return rows
for row in rows:
if row.state == 6:
warn += 1
msg += 'Database %s is %s\n' % (row.db, row.state_desc)
elif row.state > 3:
crit += 1
msg += 'Database %s is %s\n' % (row.db, row.state_desc)
if row.user_access > 0:
#warn += 1
msg += 'Database %s in mode %s\n' % (row.db, row.user_access_desc)
if crit > 0:
code = 'CRITICAL'
msg = 'Database state CRITICAL\n' + msg
elif warn > 0:
code = 'WARNING'
msg = 'Database state warning\n' + msg
code = 'OK'
msg = 'Databases OK\n' + msg
return {'code':code, 'msg': msg}
def replication_status(host):
"""Report transactional replication status"""
mstat = mwarn = 0
msg = ''
status = {0:'Unknown', 1:'Started', 2:'Succeeded', 3:'Active', 4:'Idle', 5:'Retrying', 6:'Failed'}
warning = {0: '', 1:'-Expiration ', 2:'-Latency '}
sql = 'EXEC dbo.sp_replmonitorhelppublication @publisher = @@SERVERNAME'
rows = execute_sql(host, sql, 'distribution')
if type(rows) is dict: return rows
for row in rows:
if row.status > mstat: mstat = row.status
if row.warning > mwarn: mwarn = row.warning
if row.worst_latency is None: row.worst_latency = 0
msg += 'Pub:%s DB:%s Status:%s%s MaxLatency:%ss\n' % (row.publication, row.publisher_db, status[row.status], warning[row.warning], row.worst_latency)
sql = 'EXEC dbo.sp_replmonitorhelpsubscription @publisher = @@SERVERNAME, @publication_type = 0' # Transactional replication
rows = execute_sql(host, sql, 'distribution')
if type(rows) is dict: return rows
for row in rows:
if row.status > mstat: mstat = row.status
if row.warning > mwarn: mwarn = row.warning
if row.latency is None: row.latency = '?'
msg += 'Sub:%s DB:%s Status:%s%s Latency:%ss\n' % (row.subscriber, row.subscriber_db, status[row.status], warning[row.warning], row.latency)
if mstat == 6:
code = 'CRITICAL'
msg = 'Replication CRITICAL\n' + msg
elif mstat == 5 or mwarn > 0:
code = 'WARNING'
msg = 'Replication WARNING\n' + msg
code = 'OK'
msg = 'Replication OK\n' + msg
return {'code':code, 'msg': msg}
def mirror_status(host):
"""Report mirror status"""
crit = warn = 0
msg = ''
sql = """SELECT dbname, m.mirroring_partner_instance partner, m.mirroring_state, m.mirroring_state_desc state
FROM sys.databases d
INNER JOIN sys.database_mirroring m ON m.database_id = d.database_id
WHERE m.mirroring_state IS NOT NULL"""
rows = execute_sql(host, sql)
if type(rows) is dict: return rows
#state = {0:'Suspended', 1:'Disconnected', 2:'Synchronizing', 3:'PendingFailover', 4:'Synchronized'}
#sql = "EXEC sp_dbmmonitorresults '%s'" % dbname
for row in rows:
if row.mirroring_state < 2: crit += 1
if row.mirroring_state == 3: warn += 1
msg += "DB:%s Partner:%s State:%s\n" % (row.dbname, row.partner, row.state)
if crit > 0:
code = 'CRITICAL'
msg = 'Mirroring CRITICAL\n' + msg
elif warn > 0:
code = 'WARNING'
msg = 'Mirroring warning\n' + msg
code = 'OK'
msg = 'Mirroring OK\n' + msg
return {'code':code, 'msg': msg}
def logship_status(host):
"""Report log shipping retstore delta and latency"""
crit = warn = 0
msg = ''
sql = """SELECT secondary_server, secondary_database, primary_server, primary_database,
last_restored_date, DATEDIFF(mi, last_restored_date, GETDATE()) last_restored_delta,
last_restored_latency, restore_threshold
FROM msdb..log_shipping_monitor_secondary"""
rows = execute_sql(host, sql)
if type(rows) is dict: return rows
for row in rows:
if row.last_restored_delta >= row.restore_threshold:
warn += 1
msg += "Srv:%s DB:%s Restore delta %s exceeds threshold of %s\n" % (row.primary_server, row.primary_database, row.last_restored_delta, row.restore_threshold)
if row.last_restored_latency >= row.restore_threshold:
crit += 1
msg += "Srv:%s DB:%s Latency of %s exceeds threshold of %s\n" % (row.primary_server, row.primary_database, row.last_restored_latency, row.restore_threshold)
if row.last_restored_delta < row.restore_threshold and row.last_restored_latency < row.restore_threshold:
msg += "Srv:%s DB:%s Latency:%s Restore delta:%s\n" % (row.primary_server, row.primary_database, row.last_restored_latency, row.last_restored_delta)
if crit > 0:
code = 'CRITICAL'
msg = 'Log shipping CRITICAL\n' + msg
elif warn > 0:
code = 'WARNING'
msg = 'Log shipping warning\n' + msg
code = 'OK'
msg = 'Log shipping OK\n' + msg
return {'code':code, 'msg': msg}
def main():
if len(sys.argv) < 2: usage()
try: opts, args = getopt.getopt(sys.argv[1:], 'h:t:')
except getopt.GetoptError as err: usage()
host = test = None
for o, value in opts:
if o == "-h": host = value
elif o == "-t": test = value
else: usage()
if host is None or test is None: usage()
func = get_func(test)
result = func(host)
nagios_return(result['code'], result['msg'])
if __name__ == "__main__":
I get the following error when I try to execute this code.
CRITICAL: Unable to execute SQL query on host 23000 [23000] [FreeTDS][SQL Server]Cannot insert the value NULL into column 'publication_id', table 'tempdb.dbo.#tmp_replication_monitordata________________________________________________________________________________________000000055E3E'; column does not allow nulls. INSERT fails. (515) (SQLExecDirectW)
I have not encountered that error before, but it might be related to a replication misconfiguration. See as an example.
Hi Matt,
I saved this into a file called and when I run it I get below errors.
line 12: import: command not found line 14: Syntax error near unespected token '(' line 14: nagios_codes = dict (ok=0, WARNING=1, CRITICAL=2, UNKNOWN=3, DEPENDANT=4)'
can you please suggest any things.
many thanks
The failure of the import command on line 12 (a Python keyword) and following token error indicate something is off with formatting of the code. Is there any mix of leading tabs and spaces? What Python version?
Hi Matt,
Many thanks for responding. the problem was that I was not running the code with python switch. Now it runs but get below error. can you please provide a help file. its just I want to pass it the server name and databse name from nagios.
root@networkmonitor:/usr/local/nagios/libexec# python ./ --help File "./", line 31 except pyodbc.Error as e: return {'code':'CRITICAL', 'msg': "Unable to connect to SQL host %s\n%s" % (host, string.join(e,'\n'))} ^ SyntaxError: invalid syntax
usage howto please?
Thanks for this great script!
I have created a new version and also packaged it on pip.
I have done a refactor of the code and now compatible with python3.
Here is the link to the project to collaborate and use: