Use telnetlib to execute the shell command from localhost.
An example to show the funciton. Output the result to result.log.
from PyRemoteControl import RemoteShellCommand
host info
host_ip = '192.168.32.72' user_name = 'fw' password = 'fw' result_file = 'result.log'
command List
cmdList = [ 'cd' , 'll' ]
init
cursor = RemoteShellCommand( host_ip , user_name , password , result_file ) cursor.AddPrompt( '[fw@localhost .]\$ ') cursor.AddPrompt( '[root@localhost .]# ' )
connect to Linux
cursor.Login()
change to root
cursor.SendInterActiveCmd( 'su - ' , [ ('Password: ' , 'rootPassord')] , False)
Exec Command
for cmd in cmdList : cursor.SendCmd( cmd )
logout
cursor.Logout()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | import telnetlib
import sys
class RemoteShellCommand:
def __init__(self, HostIP , UserName , Password , OutputFileName = ''):
# Set inner info
self.info = {}
## prompt dict
self.info['EndList'] = ["\n"]
## time out
self.info['TimeOut'] = 30
## connection info
self.info['HostIP'] = HostIP
self.info['UserName'] = UserName
self.info['Password'] = Password
# init connection
self.telnet_con = None
self.read_eoc = False
self.result_buf = ''
# init output
self.outputter = None
if OutputFileName :
self.outputter = open( OutputFileName , 'wb' )
else:
self.outputter = sys.stdout
def __del__( self ):
if self.telnet_con :
self.telnet_con.close()
self.telnet_con = None
if ( self.outputter != sys.stdout ) and ( not self.outputter ) :
self.close()
self.outputter = None
def Login( self ):
if ( not self.info['HostIP'] ) or self.telnet_con :
raise
# init connection
self.telnet_con = telnetlib.Telnet()
self.telnet_con.open( self.info['HostIP'] )
# login
self.telnet_con.read_until ( 'login: ' , self.info['TimeOut'] )
self.telnet_con.write( self.info['UserName'] + "\n" )
self.telnet_con.read_until ( 'Password: ' , self.info['TimeOut'] )
self.telnet_con.write( self.info['Password'] + "\n" )
self.read_eoc = False
def Logout( self ):
if self.telnet_con :
self.telnet_con.close()
self.telnet_con = None
if ( self.outputter != sys.stdout ) and ( not self.outputter ) :
self.close()
self.outputter = None
def AddPrompt( self , content ):
if content :
self.info['EndList'].insert( 0 , content )
else:
raise
def SendCmd( self , cmd , IsHasResult = True ):
# clear the result buffer of command
self.ReadBuffer()
# send command
self.result_buf = ''
self.read_eoc = False
self.telnet_con.write( cmd + "\n" )
# collection the result
if IsHasResult :
self.ReadBuffer()
def SendInterActiveCmd( self , cmd , InteractiveList , IsHasResult = True ):
# check valid connection
if not self.telnet_con :
raise
# clear the result buffer of command
self.ReadBuffer()
# send command
self.result_buf = ''
self.read_eoc = False
self.telnet_con.write( cmd + "\n" )
# Send Interactive command
for each in InteractiveList :
self.telnet_con.read_until( each[0] , self.info['TimeOut'] )
self.telnet_con.write( each[1] + "\n" )
# read command result
if IsHasResult :
self.ReadBuffer()
def ReadBuffer( self ) :
if self.read_eoc :
return
self.result_buf = ''
matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
self.outputter.write( self.result_buf )
while matchIndex == len(self.info['EndList']) - 1 :
matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
self.outputter.write( self.result_buf )
self.read_eoc = True
|