A lightweight and portable way to hex-dump network traffic between network applications.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | #!/usr/bin/env python
# hex-dump network proxy: run with "-h" to see usage information
"""usage: PROGRAM [options] <fromport:hostname:toport> <...>
A port forwarding proxy server that hex-dumps traffic in both directions.
Example:
Listen for and forward connections on port 8080 to port 80 on
www.google.com, hex-dumping the network traffic in both directions:
$ PROGRAM 8080:www.google.com:80
Copyright (C) 2005-2007 Andrew Ellerton, mail: activestate-at->ellerton.net
"""
import sys, getopt, logging, re
try: import twisted
except ImportError:
print "Please install the 'twisted' package from http://twistedmatrix.com/"
sys.exit(1)
from twisted.python import failure
from twisted.internet import reactor, error, address, tcp
from twisted.internet.protocol import Protocol, Factory, ClientFactory
# --------------------------------------------------------------------------
# This GREAT hexdump function from Sebastian Keim's Python recipe at:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/142812
# --------------------------------------------------------------------------
HEX_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
def hexdump(prefix, src, length=16):
N=0; result=''
while src:
s,src = src[:length],src[length:]
hexa = ' '.join(["%02X"%ord(x) for x in s])
s = s.translate(HEX_FILTER)
result += "%s %04X %-*s %s\n" % (prefix, N, length*3, hexa, s)
N+=length
return result
# --------------------------------------------------------------------------
def transport_info(t):
if isinstance(t, tcp.Client): return transport_info(t.getHost())
elif isinstance(t, address.IPv4Address): return "%s:%d" % (t.host, t.port)
else: return repr(t)
class HexdumpClientProtocol(Protocol):
def __init__(self, factory):
self.factory=factory
def connectionMade(self):
logger.debug("bridge connection open %s" % transport_info(self.transport))
self.factory.owner.clientName=transport_info(self.transport)
self.writeCache()
def write(self, buf):
self.transport.write(buf)
def dataReceived(self, recvd):
self.factory.owner.write(recvd)
def writeCache(self):
while self.factory.writeCache:
item = self.factory.writeCache.pop(0)
self.write(item)
class HexdumpClientFactory(ClientFactory):
def __init__(self, owner):
self.owner = owner
self.writeCache = []
def startedConnecting(self, connector):
logger.debug('connection opening...')
def buildProtocol(self, addr):
logger.debug('connecting to remote server %s' % transport_info(addr))
p = HexdumpClientProtocol(self)
self.owner.dest = p
return p
def clientConnectionLost(self, connector, reason):
if isinstance(reason, failure.Failure):
if reason.type == twisted.internet.error.ConnectionDone:
logger.info("remote server closed connection")
else:
logger.info("remote server connection lost, reason: %r" % reason)
self.owner.close()
def clientConnectionFailed(self, connector, reason):
logger.debug("dest: connection failed: %r" % reason)
self.owner.close()
class HexdumpServerProtocol(Protocol):
def __init__(self, serverFactory):
self.factory=serverFactory
self.clientFactory = HexdumpClientFactory(self)
self.clientName=id(self) # this is repopulated later
self.serverName=None
def serverName(self):
return "%s:%d" %(self.factory.remote_host,self.factory.remote_port)
def connectionMade(self):
self.serverName="%s:%d" %(self.factory.remote_host,self.factory.remote_port)
logger.info("client %s opened connection -> server %s" % (
self.clientName, self.serverName))
# cxn to this server has opened. Open a port to the destination...
reactor.connectTCP(self.factory.remote_host,
self.factory.remote_port, self.clientFactory)
def connectionLost(self, reason):
logger.info("client %s closed connection" % self.clientName) # str(reason)
if self.dest and self.dest.transport: self.dest.transport.loseConnection()
def connectionFailed(self, reason):
logger.debug("proxy connection failed: %s" % str(reason))
def dataReceived(self, recvd):
logger.info("client %s -> server %s (%d bytes)\n%s" % (
self.clientName, self.serverName, len(recvd), hexdump('->', recvd)))
if hasattr(self, "dest"):
self.dest.write(recvd)
else:
logger.debug("caching data until remote connection is open")
self.clientFactory.writeCache.append(recvd)
def write(self, buf):
logger.info("client %s <= server %s (%d bytes)\n%s" % (
self.clientName, self.serverName, len(buf), hexdump('<=', buf)))
self.transport.write(buf)
def close(self):
self.dest = None
self.transport.loseConnection()
class HexdumpServerFactory(Factory):
def __init__(self, listen_port, remote_host, remote_port, max_connections = None):
self.listen_port = listen_port
self.remote_host = remote_host
self.remote_port = remote_port
self.max_connections = max_connections
self.numConnections = 0
def startFactory(self):
logger.info("listening on %d -> %s:%d" % (self.listen_port, self.remote_host, self.remote_port))
def buildProtocol(self, addr): # could process max/num connections here
return HexdumpServerProtocol(self)
if __name__ == "__main__":
import os.path
__usage__ = __doc__.replace("PROGRAM", os.path.basename(sys.argv[0]))
port_map_pattern=re.compile("(\d+):([\w\.\-]+):(\d+)")
logger = logging.getLogger()
num_factories=0
def die_usage(msg=""):
sys.stderr.write("%s%s\n" % (__usage__, msg))
sys.exit(1)
def add_factory(port_map_desc):
match = port_map_pattern.match(port_map_desc)
if not match: die_usage("malformed port map description: %s" % port_map_desc)
listen_port = int(match.group(1))
remote_host = match.group(2)
remote_port = int(match.group(3))
factory = HexdumpServerFactory(listen_port, remote_host, remote_port)
reactor.listenTCP(factory.listen_port, factory)
try:
opts, args = getopt.getopt(sys.argv[1:], "hl:v", ["help", "log=", "verbose"])
except getopt.GetoptError, e:
die_usage(str(e))
logname=None
logger.setLevel(logging.INFO)
for o, a in opts:
if o in ("-h", "--help"): die_usage()
if o in ("-v", "--verbose"): logger.setLevel(logging.DEBUG)
if o in ("-l", "--log"): logname=a; print "log: [%s]" % logname
if logname: handler=logging.FileHandler(logname)
else: handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
for a in args: add_factory(a); num_factories+=1
if num_factories==0: die_usage("No proxy/port forwarding connections specified")
logger.info("ready (Ctrl+C to stop)")
reactor.run()
logger.info("stopped")
|
Let's say you're writing a network client program that connects to a remote server. Although most of the work is going well, there's a bug caused either by your program sending bogus content or the server responding with a broken reply. You're sure the problem is in the bytes being sent or received, but you can't diagnose it without seeing a dump of the actual bytes being transmitted.
Packet sniffing software is powerful, but may be overkill for this usage. A simple and portable solution is to run this hex-dump server in between your normal client and server programs. All traffic it receives, in both directions, is hex-dumped to stdout or a log file.
Here's how you use it. Let's say your normal connectivity setup looks like this:
Client (host 'oslo') -> Server (host 'zurich', port 1000)
To hexdump traffic, run an instance of the hex-dump server in between the two programs, like this:
Client (host 'oslo') -> Hex-dump Proxy (port 2222 - or any port you choose) -> Server (host 'zurich', port 1000)
Configuring and running a proxy is trivial - the syntax is borrowed from SSH:
you@oslo $ hexproxy.py 2222:zurich:1000 # run on the 'oslo' machine
or you could forward a port on the "zurich" machine itself:
you@zurich $ hexproxy.py 2222:zurich:1000 # run on the 'zurich' machine
Run your client again, but this time configure it to connect to oslo:2222 or zurich:2222, depending on which machine the proxy is running on.
As a practical example - here is a simple way to see all HTTP traffic sent and received when Firefox connects to Google. Run this in a shell:
you@oslo $ hexproxy.py 8080:www.google.com:80
and then load the Google homepage in Firefox or any browser. In the console, you will see: <pre> 2007-02-18 17:47:11,217 INFO listening on 8080 -> www.google.com:80 2007-02-18 17:47:11,217 INFO ready (Ctrl+C to stop) 2007-02-18 17:47:18,265 INFO client 11389528 opened connection -> server www.google.com:80 2007-02-18 17:47:18,312 INFO client 11389528 -> server www.google.com:80 (401 bytes) -> 0000 47 45 54 20 2F 20 48 54 54 50 2F 31 2E 31 0D 0A GET / HTTP/1.1.. -> 0010 48 6F 73 74 3A 20 6F 73 6C 6F 3A 38 30 38 30 0D Host: oslo:8080. -> 0020 0A 55 73 65 72 2D 41 67 65 6E 74 3A 20 4D 6F 7A .User-Agent: Moz -> 0030 69 6C 6C 61 2F 35 2E 30 20 28 57 69 6E 64 6F 77 illa/5.0 (Window -> 0040 73 3B 20 55 3B 20 57 69 6E 64 6F 77 73 20 4E 54 s; U; Windows NT -> 0050 20 35 2E 30 3B 20 65 6E 2D 55 53 3B 20 72 76 3A 5.0; en-US; rv: -> 0060 31 2E 38 2E 31 2E 31 29 20 47 65 63 6B 6F 2F 32 1.8.1.1) Gecko/2 -> 0070 30 30 36 31 32 30 34 20 46 69 72 65 66 6F 78 2F 0061204 Firefox/ -> 0080 32 2E 30 2E 30 2E 31 0D 0A 41 63 63 65 70 74 3A 2.0.0.1..Accept: -> 0090 20 74 65 78 74 2F 78 6D 6C 2C 61 70 70 6C 69 63 text/xml,applic -> 00A0 61 74 69 6F 6E 2F 78 6D 6C 2C 61 70 70 6C 69 63 ation/xml,applic -> 00B0 61 74 69 6F 6E 2F 78 68 74 6D 6C 2B 78 6D 6C 2C ation/xhtml+xml, -> 00C0 74 65 78 74 2F 68 74 6D 6C 3B 71 3D 30 2E 39 2C text/html;q=0.9, -> 00D0 74 65 78 74 2F 70 6C 61 69 6E 3B 71 3D 30 2E 38 text/plain;q=0.8 -> 00E0 2C 69 6D 61 67 65 2F 70 6E 67 2C 2A 2F 2A 3B 71 ,image/png,/;q -> 00F0 3D 30 2E 35 0D 0A 41 63 63 65 70 74 2D 4C 61 6E =0.5..Accept-Lan -> 0100 67 75 61 67 65 3A 20 65 6E 2D 75 73 2C 65 6E 3B guage: en-us,en; -> 0110 71 3D 30 2E 35 0D 0A 41 63 63 65 70 74 2D 45 6E q=0.5..Accept-En -> 0120 63 6F 64 69 6E 67 3A 20 67 7A 69 70 2C 64 65 66 coding: gzip,def -> 0130 6C 61 74 65 0D 0A 41 63 63 65 70 74 2D 43 68 61 late..Accept-Cha -> 0140 72 73 65 74 3A 20 49 53 4F 2D 38 38 35 39 2D 31 rset: ISO-8859-1 -> 0150 2C 75 74 66 2D 38 3B 71 3D 30 2E 37 2C 2A 3B 71 ,utf-8;q=0.7,*;q -> 0160 3D 30 2E 37 0D 0A 4B 65 65 70 2D 41 6C 69 76 65 =0.7..Keep-Alive -> 0170 3A 20 33 30 30 0D 0A 43 6F 6E 6E 65 63 74 69 6F : 300..Connectio -> 0180 6E 3A 20 6B 65 65 70 2D 61 6C 69 76 65 0D 0A 0D n: keep-alive... -> 0190 0A .
2007-02-18 17:47:18,453 INFO client 192.168.1.12:1722 <- server www.google.com:80 (1357 bytes) <- 0000 48 54 54 50 2F 31 2E 31 20 32 30 30 20 4F 4B 0D HTTP/1.1 200 OK. <- 0010 0A 43 61 63 68 65 2D 43 6F 6E 74 72 6F 6C 3A 20 .Cache-Control: <- 0020 70 72 69 76 61 74 65 0D 0A 43 6F 6E 74 65 6E 74 private..Content <- 0030 2D 54 79 70 65 3A 20 74 65 78 74 2F 68 74 6D 6C -Type: text/html <- 0040 0D 0A 53 65 74 2D 43 6F 6F 6B 69 65 3A 20 50 52 ..Set-Cookie: PR ... </pre>
You can log to a file using the "-l" or "--log" switches.
Like SSH, you can are not restricted to forwarding only one port. For example, port 8080 can go to Google, and 8081 to Yahoo!:
user@oslo $ hexproxy.py 8080:www.google.com:80 8081:www.yahoo.com:80
Limitations
I'm sure some error handling scenarios are not fully covered. However, for general use while developing and debugging network software, I've found this version covers everything I need.
References
The excellent hexdump recipe by Sebastian Keim: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/142812
Need to handle client disconnects. When the client which is connected to your HexdumpServerProtocol disconnects, you probably want to disconnect your HexdumpClientProtocol guy. So, in connectionLost on your server, you may want to put something like this:
Excellent recipe! Thanks!
Good idea. I'll add it in :)