This recipe provides a mechanism for remote, automated control of a network device via its command-line interface (CLI). It assumes that the CLI is accessed using Telnet. Furthermore, the device cannot be accessed directly, instead the user has to SSH to an intermediate jump host before Telneting to the device.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | import sys,re,time
import pexpect
import exceptions
from string import *
from telnetlib import Telnet
""" Define exceptions """
class TelnetError(exceptions.Exception):
def __init__(self, args=None):
self.args=args
self.errmsg = ''
for a in self.args:
self.errmsg += str(a)
class JumpError(exceptions.Exception):
def __init__(self, args=None):
self.args=args
self.errmsg = ''
for a in self.args:
self.errmsg += str(a)
class Jump:
""" Initiate an SSH tunnel
"""
def __init__(self, host, env = {}):
""" If env not passed explicitly, then
assume that its inherited from parent
class.
"""
if env != {}:
self.env = env
self._jump_params()
sshcmd = "ssh %s -L %s:%s:23" % \
(self.jumpserver,self.jumpport,host)
if self.DEBUG >= 3: print "ssh cmd: %s" % (sshcmd)
""" Create SSH tunnel """
self.tunnel = pexpect.spawn(sshcmd)
if self.DEBUG >= 2: print "jump to %s:%s" % \
(self.jumpserver,self.jumpport)
if self.DEBUG >= 3: print "waiting for %s" % \
(self.jumpprompt)
""" Process response from jump server """
expected_replies = [self.jumpprompt,\
'Are you sure you want to continue connecting'\
'ssword:']
while 1:
response = self.tunnel.expect(expected_replies)
if response == 0: # got prompt
break
""" Got ssh password, prompt, so no public
key encryption set up
"""
if response == 1:
if self.env.has_key('SSHPASSWD'):
self.sshpassword = "%s\n" % \
(self.env['SSHPASSWD'])
else:
raise JumpError, "no SSH Password"
self._ssh_login()
""" If prompted, add to known_hosts file """
if response == 2:
self.tunnel.send("yes\n")
if self.DEBUG >= 1: print self.tunnel.before
if self.DEBUG >= 3: print "tunnel %s:%s established" % \
(self.jumpserver,self.jumpport)
""" Process parameters needed to create the
SSH tunnel to the Jump host
"""
def _jump_params(self):
""" Set debug level """
if self.env.has_key('DEBUG'):
self.DEBUG = int(self.env['DEBUG'])
else:
self.DEBUG = 0
try:
self.jumpserver = self.env['JMPSERVER']
self.jumpport = self.env['PORT']
self.jumpprompt = self.env['JMPPROMPT']
except:
raise JumpError, "missing parameters"
return 0
""" Login with SSH password
"""
def _ssh_login(self):
if self.DEBUG >= 1: print self.tunnel.before
self.tunnel.send(self.sshpassword)
class Climgmt(Jump):
def __init__(self,host,env = {}):
self.host = host
""" If no environment passed as a parameter,
then assume environment is inherited from
parent class.
"""
if env != {}:
self.env = env
self._climgmt_params()
""" Create ssh tunnel is jump server specified """
if self.jumpserver != None:
Jump.__init__(self,host)
if self.DEBUG >= 3: print "Telneting to %s on port %d" % \
(self.host, self.port)
""" Initiate Telnet session """
try:
self.session = Telnet(self.host, self.port)
except:
self.tunnel.close()
raise TelnetError, "cannot establish telnet session"
""" Set parameters
"""
def _climgmt_params(self):
""" Debug level """
if self.env.has_key('DEBUG'):
self.DEBUG = int(self.env['DEBUG'])
else:
self.DEBUG = 0
""" Device prompt """
if self.env.has_key('PROMPT'):
self.prompt = self.env['PROMPT']
else:
self.prommpt = None
""" Jump server """
if self.env.has_key('JMPSERVER'):
self.jumpserver = self.env['JMPSERVER']
self.host = "localhost"
else:
self.jumpserver = None
""" The Telnet port maybe different because we are
going through a jump server (ssh tunnel).
"""
if self.env.has_key('PORT'):
self.port = int(self.env['PORT'])
else:
self.port = 23 # default Telnet port
return 0
""" Issue a command to the Telnet device
"""
def cmd(self, cmd, newprompt = None):
if newprompt == None:
prompt = self.prompt
else:
prompt = newprompt
self.session.write(cmd+'\n')
response = self.session.read_until(prompt,8)
if self.DEBUG >= 3: print response
lines = response.split('\n')
lines = map(lambda i: strip(strip(i,'\r')), lines)
return lines
""" Close telnet session (and ssh tunnel)
"""
def close(self):
if self.DEBUG >= 3: print "closing Telnet session <%s,%d>" % \
(self.host, self.port)
""" Close Telnet session """
self.session.close() # close telnet session
""" Check to see if tunnel exists (note: must
find better way of check if tunnel exists)
"""
try:
tunnel_exists = self.tunnel
except AttributeError, e:
return -1
except NameError, e:
return -1
if self.DEBUG >= 3: print "closing ssh tunnel <%s,%s>" % \
(self.jumpserver, self.jumpport)
""" Close ssh tunnel (if it exists) """
self.tunnel.close() # close tunnel
|
The "Jump" class is responsible for the creating the SSH tunnel. It responds to "authenticity of host can't be established" messages (by answering yes). If an SSH password is not passed to the class, then it assumes public key encryption is set up.
The Jump class is inherited by the Climgmt class. which is responsible for the initiating the Telnet session (through the tunnel created by Jump). Climgmt also provides a "cmd" method for issuing commands to the device. The login sequence is specific to each device. The Device class below is for a Lucent stinger DSLAM.
class Device(Climgmt):
""" Device specific login sequence
"""
def __init__(self,device,env):
""" Set regular expressions """
self.pwderr1 = re.compile('Incorrect password')
self.pwderr2 = re.compile('Login incorrect')
self.blank = re.compile(' ')
self.device = device
self.env = env
self._device_params()
""" Call Climgmt to Telnet in """
Climgmt.__init__(self,self.device,self.env)
""" Login to device """
self._device_login()
""" Process device parameters
"""
def _device_params(self):
""" Get debug level """
if self.env.has_key('DEBUG'):
self.DEBUG = int(self.env['DEBUG'])
else:
self.DEBUG = 0
self.exchange = self.device.split(".")[1].upper()
""" Get Telnet password """
if self.env.has_key('TELPASSWD'):
self.telnet_password = self.env['TELPASSWD'] + '\n'
else:
self.telnet_password = '\n'
""" Get device user name """
if self.env.has_key('USER'):
self.user_name = self.env['USER'] + '\n'
else:
self.user_name = 'admin\n'
if self.env.has_key('PROMPT'):
self.prompt = self.env['PROMPT']
else:
self.prompt = self.user_name[:-1] + '>'
""" Get device user password """
if self.env.has_key('USRPASSWD'):
self.user_password = self.env['USRPASSWD'] + '\n'
else:
self.user_password = 'admin\n'
return 0
""" Device login sequnece
"""
def _device_login(self):
""" If correct response not received, then probably
Telnet password problem. In which case raise
TelnetError exception
"""
try:
""" wait for password prompt """
response = self.session.read_until('password:',3)
except EOFError, e:
""" if no prompt then probably Telnet error """
self.close()
raise TelnetError, "no password prompt"
if self.DEBUG >= 1: print response
""" Send Telnet password """
self.session.write(self.telnet_password)
try:
""" Wait for stinger user prompt """
response = self.session.read_until('User:', 2)
except EOFError, e:
self.close()
raise TelnetError, "no User prompt"
if self.DEBUG >= 1: print response
""" Check if incorrect password response
is returned.
"""
if self.pwderr1.search(response):
self.close()
raise TelnetError, "incorrect Telnet password"
""" Send user name """
self.session.write(self.user_name)
try:
response = self.session.read_until('Password:',4)
except EOFError, e:
self.close()
raise TelnetError, "read_until error"
if self.DEBUG >= 1: print response
""" Send user password """
self.session.write(self.user_password)
""" Wait for device prompt """
try:
response = self.session.read_until(self.prompt,4)
except EOFError, e:
self.close()
raise TelnetError, "read_until error"
""" Check if incorrect password response is returned.
"""
if self.pwderr2.search(response):
self.close()
raise TelnetError, "incorrect user password"
if self.DEBUG >= 1: print response
return 0
The login sequence for Stinger is quite complex, requiring a Telnet password, followed by a User name and a password for that user:
Enter password: *******
User: admin
Password: *******
I suggest Telneting in to the device you wish to control and making a note of the login sequence. Modify the Device class to suite your device. I have removed all the device specific stuff from Jump and Climgmt. That way you can have a number of "Device" classes for different devices, e.g. Cisco etc.
This is how to use it:
if __name__=='__main__':
env = {'JMPSERVER': 'juser@jump.example.org',
'JMPPROMPT': 'juser$:',
'TELPASSWD': 'letmein',
'USER': 'admin',
'USRPASSWD': 'letmein2',
'PORT': 9012,
'DEBUG': 0}
t = Device('stinger1.example.org', env)
output = t.cmd('date')
print output
t.close()
The env data dictionary contains all the parameters for the jump host, Telnet device and any user login details. The JMPPROMPT parameter is the bash prompt on the jump host. When the script sees this it knows that the tunnel has been established. The DEBUG option controls how verbose the output it. You will also need to know the prompt of the device. In the case of a Stinger, the prompt is a ">" appended to the user login name, e.g. "admin>". You may wish to pass the prompt as a separate element of the env data dictionary. Also note, prompts may change when you issues command - the cmd method allows you to pass an optional "prompt" parameter in order to cater for this.
In this example, an instance of Device is created and then the "date" command is issued. Each line of the result is returned in a list.