Welcome, guest | Sign In | My Account | Store | Cart

This recipe provides a mechanism for remote, automated control of a network device via its command-line interface (CLI). It assumes that the CLI is accessed using Telnet. Furthermore, the device cannot be accessed directly, instead the user has to SSH to an intermediate jump host before Telneting to the device.

Python, 216 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import sys,re,time
import pexpect
import exceptions

from string import *
from telnetlib import Telnet

""" Define exceptions """

class TelnetError(exceptions.Exception):
    def __init__(self, args=None):
        self.args=args
        self.errmsg = ''
        for a in self.args:
            self.errmsg += str(a)

class JumpError(exceptions.Exception):
    def __init__(self, args=None):
        self.args=args
        self.errmsg = ''
        for a in self.args:
            self.errmsg += str(a)

class Jump:

    """ Initiate an SSH tunnel  
    """
    def __init__(self, host, env = {}):

        """ If env not passed explicitly, then 
            assume that its inherited from parent 
            class.
        """
        if env != {}:
           self.env = env

        self._jump_params()

        sshcmd = "ssh %s -L %s:%s:23" % \
            (self.jumpserver,self.jumpport,host)

        if self.DEBUG >= 3: print "ssh cmd: %s" % (sshcmd)

        """ Create SSH tunnel """
        self.tunnel = pexpect.spawn(sshcmd)

        if self.DEBUG >= 2: print "jump to %s:%s" % \
                (self.jumpserver,self.jumpport)

        if self.DEBUG >= 3: print "waiting for %s" % \
            (self.jumpprompt)

        """ Process response from jump server """
        expected_replies = [self.jumpprompt,\
            'Are you sure you want to continue connecting'\
            'ssword:']

        while 1:
            response = self.tunnel.expect(expected_replies)

            if response == 0:  # got prompt
                break

            """ Got ssh password, prompt, so no public 
                key encryption set up
            """
            if response == 1:  
                if self.env.has_key('SSHPASSWD'):
                    self.sshpassword = "%s\n" % \
                        (self.env['SSHPASSWD'])
                else:
                    raise JumpError, "no SSH Password"
                self._ssh_login()

            """ If prompted, add to known_hosts file """
            if response == 2:               
                self.tunnel.send("yes\n") 
     
        if self.DEBUG >= 1: print self.tunnel.before

        if self.DEBUG >= 3: print "tunnel %s:%s established" % \
                (self.jumpserver,self.jumpport)

    """ Process parameters needed to create the
        SSH tunnel to the Jump host
    """
    def _jump_params(self):

        """ Set debug level """
        if self.env.has_key('DEBUG'):
            self.DEBUG = int(self.env['DEBUG'])
        else:
            self.DEBUG = 0

        try:
            self.jumpserver = self.env['JMPSERVER']
            self.jumpport   = self.env['PORT']
            self.jumpprompt = self.env['JMPPROMPT']
        except:
            raise JumpError, "missing parameters"

        return 0

    """ Login with SSH password
    """
    def _ssh_login(self):
        if self.DEBUG >= 1: print self.tunnel.before
        self.tunnel.send(self.sshpassword)

class Climgmt(Jump):

    def __init__(self,host,env = {}):

        self.host = host

        """ If no environment passed as a parameter,
            then assume environment is inherited from
            parent class. 
        """
        if env != {}:
            self.env = env

        self._climgmt_params()

        """ Create ssh tunnel is jump server specified """
        if self.jumpserver != None:
            Jump.__init__(self,host)  

        if self.DEBUG >= 3: print "Telneting to %s on port %d" %  \
            (self.host, self.port)

        """ Initiate Telnet session """
        try:
            self.session = Telnet(self.host, self.port)
        except: 
            self.tunnel.close()
            raise TelnetError, "cannot establish telnet session"

    """ Set parameters 
    """
    def _climgmt_params(self):

        """ Debug level """
        if self.env.has_key('DEBUG'):
            self.DEBUG = int(self.env['DEBUG'])
        else:
            self.DEBUG = 0

        """ Device prompt """
        if self.env.has_key('PROMPT'):
            self.prompt = self.env['PROMPT'] 
        else:
            self.prommpt = None

        """ Jump server """
        if self.env.has_key('JMPSERVER'):
            self.jumpserver = self.env['JMPSERVER']
            self.host = "localhost"
        else:
            self.jumpserver = None

        """ The Telnet port maybe different because we are 
            going through a jump server (ssh tunnel).
        """
        if self.env.has_key('PORT'):
            self.port = int(self.env['PORT'])
        else:
            self.port = 23    # default Telnet port

        return 0

    """ Issue a command to the Telnet device  
    """
    def cmd(self, cmd, newprompt = None):

        if newprompt == None:
            prompt = self.prompt
        else:
            prompt = newprompt

        self.session.write(cmd+'\n')

        response = self.session.read_until(prompt,8)

        if self.DEBUG >= 3: print response

        lines = response.split('\n')
        lines = map(lambda i: strip(strip(i,'\r')), lines)

        return lines

    """ Close telnet session (and ssh tunnel)   
    """
    def close(self):

        if self.DEBUG >= 3: print "closing Telnet session <%s,%d>" % \
                (self.host, self.port)

        """ Close Telnet session """
        self.session.close()    # close telnet session

        """ Check to see if tunnel exists (note: must 
            find better way of check if tunnel exists) 
        """
        try:
            tunnel_exists = self.tunnel
        except AttributeError, e:
            return -1
        except NameError, e:
            return -1

        if self.DEBUG >= 3: print "closing ssh tunnel <%s,%s>" % \
                (self.jumpserver, self.jumpport)

        """ Close ssh tunnel (if it exists) """
        self.tunnel.close() # close tunnel 

The "Jump" class is responsible for the creating the SSH tunnel. It responds to "authenticity of host can't be established" messages (by answering yes). If an SSH password is not passed to the class, then it assumes public key encryption is set up.

The Jump class is inherited by the Climgmt class. which is responsible for the initiating the Telnet session (through the tunnel created by Jump). Climgmt also provides a "cmd" method for issuing commands to the device. The login sequence is specific to each device. The Device class below is for a Lucent stinger DSLAM.

class Device(Climgmt):

    """ Device specific login sequence
    """
    def __init__(self,device,env):

        """ Set regular expressions """
        self.pwderr1 = re.compile('Incorrect password')
        self.pwderr2 = re.compile('Login incorrect')
        self.blank = re.compile(' ')

        self.device = device
        self.env = env
        self._device_params()

        """ Call Climgmt to Telnet in """
        Climgmt.__init__(self,self.device,self.env)

    """ Login to device """
    self._device_login()

    """ Process device parameters
    """
    def _device_params(self):

        """ Get debug level """ 
        if self.env.has_key('DEBUG'):
            self.DEBUG = int(self.env['DEBUG'])
        else:
            self.DEBUG = 0

        self.exchange = self.device.split(".")[1].upper()

        """ Get Telnet password """ 
        if self.env.has_key('TELPASSWD'):
             self.telnet_password = self.env['TELPASSWD'] + '\n'
        else:
            self.telnet_password = '\n'

        """ Get device user name """
        if self.env.has_key('USER'):
             self.user_name   = self.env['USER'] + '\n'
        else:
            self.user_name   = 'admin\n'

        if self.env.has_key('PROMPT'):
            self.prompt   = self.env['PROMPT']
        else:
            self.prompt   = self.user_name[:-1] + '>'

        """ Get device user password """ 
        if self.env.has_key('USRPASSWD'):
            self.user_password = self.env['USRPASSWD'] + '\n'
        else:
            self.user_password = 'admin\n'

        return 0


    """ Device login sequnece                            
    """
    def _device_login(self):

        """ If correct response not received, then probably 
            Telnet password problem. In which case raise 
            TelnetError exception 
        """
        try:
            """ wait for password prompt """
            response = self.session.read_until('password:',3)
        except EOFError, e:
            """ if no prompt then probably Telnet error """
            self.close()
            raise TelnetError, "no password prompt"

        if self.DEBUG >= 1: print response

        """ Send Telnet password """
        self.session.write(self.telnet_password)

        try:
            """ Wait for stinger user prompt """
            response = self.session.read_until('User:', 2)
        except EOFError, e:
            self.close()
            raise TelnetError, "no User prompt"

        if self.DEBUG >= 1: print response

        """ Check if incorrect password response 
            is returned.
        """
        if self.pwderr1.search(response):
            self.close()
            raise TelnetError, "incorrect Telnet password"

        """ Send user name """
        self.session.write(self.user_name)

        try:
            response = self.session.read_until('Password:',4)
        except EOFError, e:
            self.close()
            raise TelnetError, "read_until error"

        if self.DEBUG >= 1: print response

        """ Send user password """
        self.session.write(self.user_password)

        """ Wait for device prompt """
        try:
            response = self.session.read_until(self.prompt,4)
        except EOFError, e:
            self.close()
            raise TelnetError, "read_until error"

        """ Check if incorrect password response is returned.
        """
        if self.pwderr2.search(response):
            self.close()
            raise TelnetError, "incorrect user password"

        if self.DEBUG >= 1: print response

        return 0

The login sequence for Stinger is quite complex, requiring a Telnet password, followed by a User name and a password for that user:

Enter password: *******
User: admin 
Password: *******

I suggest Telneting in to the device you wish to control and making a note of the login sequence. Modify the Device class to suite your device. I have removed all the device specific stuff from Jump and Climgmt. That way you can have a number of "Device" classes for different devices, e.g. Cisco etc.

This is how to use it:

if __name__=='__main__':

    env = {'JMPSERVER': 'juser@jump.example.org',
           'JMPPROMPT': 'juser$:',
           'TELPASSWD': 'letmein',
           'USER': 'admin',
           'USRPASSWD': 'letmein2',
           'PORT': 9012,
           'DEBUG': 0}

    t = Device('stinger1.example.org', env)
    output = t.cmd('date')
    print output
    t.close()

The env data dictionary contains all the parameters for the jump host, Telnet device and any user login details. The JMPPROMPT parameter is the bash prompt on the jump host. When the script sees this it knows that the tunnel has been established. The DEBUG option controls how verbose the output it. You will also need to know the prompt of the device. In the case of a Stinger, the prompt is a ">" appended to the user login name, e.g. "admin>". You may wish to pass the prompt as a separate element of the env data dictionary. Also note, prompts may change when you issues command - the cmd method allows you to pass an optional "prompt" parameter in order to cater for this.

In this example, an instance of Device is created and then the "date" command is issued. Each line of the result is returned in a list.