Welcome, guest | Sign In | My Account | Store | Cart

Use telnetlib to execute the shell command from localhost.

An example to show the funciton. Output the result to result.log.

from PyRemoteControl import RemoteShellCommand

host info

host_ip = '192.168.32.72' user_name = 'fw' password = 'fw' result_file = 'result.log'

command List

cmdList = [ 'cd' , 'll' ]

init

cursor = RemoteShellCommand( host_ip , user_name , password , result_file ) cursor.AddPrompt( '[fw@localhost .]\$ ') cursor.AddPrompt( '[root@localhost .]# ' )

connect to Linux

cursor.Login()

change to root

cursor.SendInterActiveCmd( 'su - ' , [ ('Password: ' , 'rootPassord')] , False)

Exec Command

for cmd in cmdList : cursor.SendCmd( cmd )

logout

cursor.Logout()

Python, 124 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import telnetlib
import sys

class RemoteShellCommand:
    def __init__(self, HostIP , UserName , Password  , OutputFileName = ''):
        # Set inner info
        self.info = {}
        
        ## prompt dict
        self.info['EndList'] = ["\n"]

        ## time out
        self.info['TimeOut'] = 30

        ## connection info
        self.info['HostIP'] = HostIP
        self.info['UserName'] = UserName
        self.info['Password'] = Password

        # init connection
        self.telnet_con = None
        self.read_eoc = False
        self.result_buf = ''

        # init output
        self.outputter = None
        if OutputFileName :
            self.outputter = open( OutputFileName , 'wb' )
        else:
            self.outputter = sys.stdout



    def __del__( self ):
        if self.telnet_con :
            self.telnet_con.close()
            self.telnet_con = None

        if (  self.outputter != sys.stdout ) and ( not self.outputter ) :
            self.close()
            self.outputter = None

    def Login( self ):
        if ( not self.info['HostIP'] ) or self.telnet_con :
            raise

        # init connection
        self.telnet_con = telnetlib.Telnet()
        self.telnet_con.open( self.info['HostIP'] )

        # login
        self.telnet_con.read_until ( 'login: ' ,  self.info['TimeOut'] )
        self.telnet_con.write( self.info['UserName'] + "\n" )
        self.telnet_con.read_until ( 'Password: ' ,  self.info['TimeOut'] )
        self.telnet_con.write( self.info['Password'] + "\n" )
        self.read_eoc = False

    def Logout( self ):
        if self.telnet_con :
            self.telnet_con.close()
            self.telnet_con = None

        if (  self.outputter != sys.stdout ) and ( not self.outputter ) :
            self.close()
            self.outputter = None


    def AddPrompt( self , content ):
        if content :
            self.info['EndList'].insert( 0 , content )
        else:
            raise


    def SendCmd( self , cmd , IsHasResult = True ):
        # clear the result buffer of command
        self.ReadBuffer()
        
        # send command
        self.result_buf = ''
        self.read_eoc = False
        self.telnet_con.write( cmd + "\n" )

        # collection the result
        if IsHasResult :
            self.ReadBuffer()

    def SendInterActiveCmd( self , cmd , InteractiveList , IsHasResult = True ):
        # check valid connection
        if not self.telnet_con :
            raise


        # clear the result buffer of command
        self.ReadBuffer()

        # send command
        self.result_buf = ''
        self.read_eoc = False
        self.telnet_con.write( cmd + "\n" )
        
        # Send Interactive command
        for each in InteractiveList :
            self.telnet_con.read_until( each[0] , self.info['TimeOut'] )
            self.telnet_con.write( each[1] + "\n" )

        # read command result
        if IsHasResult :
            self.ReadBuffer()

    def ReadBuffer( self ) :
        if self.read_eoc :
            return

        self.result_buf = ''

        matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
        self.outputter.write( self.result_buf )

        while matchIndex == len(self.info['EndList']) - 1 :
            matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
            self.outputter.write( self.result_buf )

        self.read_eoc = True