Welcome, guest | Sign In | My Account | Store | Cart
import telnetlib
import sys

class RemoteShellCommand:
    def __init__(self, HostIP , UserName , Password  , OutputFileName = ''):
        # Set inner info
        self.info = {}
        
        ## prompt dict
        self.info['EndList'] = ["\n"]

        ## time out
        self.info['TimeOut'] = 30

        ## connection info
        self.info['HostIP'] = HostIP
        self.info['UserName'] = UserName
        self.info['Password'] = Password

        # init connection
        self.telnet_con = None
        self.read_eoc = False
        self.result_buf = ''

        # init output
        self.outputter = None
        if OutputFileName :
            self.outputter = open( OutputFileName , 'wb' )
        else:
            self.outputter = sys.stdout



    def __del__( self ):
        if self.telnet_con :
            self.telnet_con.close()
            self.telnet_con = None

        if (  self.outputter != sys.stdout ) and ( not self.outputter ) :
            self.close()
            self.outputter = None

    def Login( self ):
        if ( not self.info['HostIP'] ) or self.telnet_con :
            raise

        # init connection
        self.telnet_con = telnetlib.Telnet()
        self.telnet_con.open( self.info['HostIP'] )

        # login
        self.telnet_con.read_until ( 'login: ' ,  self.info['TimeOut'] )
        self.telnet_con.write( self.info['UserName'] + "\n" )
        self.telnet_con.read_until ( 'Password: ' ,  self.info['TimeOut'] )
        self.telnet_con.write( self.info['Password'] + "\n" )
        self.read_eoc = False

    def Logout( self ):
        if self.telnet_con :
            self.telnet_con.close()
            self.telnet_con = None

        if (  self.outputter != sys.stdout ) and ( not self.outputter ) :
            self.close()
            self.outputter = None


    def AddPrompt( self , content ):
        if content :
            self.info['EndList'].insert( 0 , content )
        else:
            raise


    def SendCmd( self , cmd , IsHasResult = True ):
        # clear the result buffer of command
        self.ReadBuffer()
        
        # send command
        self.result_buf = ''
        self.read_eoc = False
        self.telnet_con.write( cmd + "\n" )

        # collection the result
        if IsHasResult :
            self.ReadBuffer()

    def SendInterActiveCmd( self , cmd , InteractiveList , IsHasResult = True ):
        # check valid connection
        if not self.telnet_con :
            raise


        # clear the result buffer of command
        self.ReadBuffer()

        # send command
        self.result_buf = ''
        self.read_eoc = False
        self.telnet_con.write( cmd + "\n" )
        
        # Send Interactive command
        for each in InteractiveList :
            self.telnet_con.read_until( each[0] , self.info['TimeOut'] )
            self.telnet_con.write( each[1] + "\n" )

        # read command result
        if IsHasResult :
            self.ReadBuffer()

    def ReadBuffer( self ) :
        if self.read_eoc :
            return

        self.result_buf = ''

        matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
        self.outputter.write( self.result_buf )

        while matchIndex == len(self.info['EndList']) - 1 :
            matchIndex , matchObj , self.result_buf = self.telnet_con.expect( self.info['EndList'] , self.info['TimeOut'] )
            self.outputter.write( self.result_buf )

        self.read_eoc = True

History