While developing <a href="http://pieces.openpolitics.com/thunderbayes/">ThunderBayes</a> (an extension for Thunderbird) I wrote this simple POP3 server to serve a message to my ThunderBayes test setup.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """pypopper: a file-based pop3 server
Useage:
python pypopper.py <port> <path_to_message_file>
"""
import logging
import os
import socket
import sys
import traceback
logging.basicConfig(format="%(name)s %(levelname)s - %(message)s")
log = logging.getLogger("pypopper")
log.setLevel(logging.INFO)
class ChatterboxConnection(object):
END = "\r\n"
def __init__(self, conn):
self.conn = conn
def __getattr__(self, name):
return getattr(self.conn, name)
def sendall(self, data, END=END):
if len(data) < 50:
log.debug("send: %r", data)
else:
log.debug("send: %r...", data[:50])
data += END
self.conn.sendall(data)
def recvall(self, END=END):
data = []
while True:
chunk = self.conn.recv(4096)
if END in chunk:
data.append(chunk[:chunk.index(END)])
break
data.append(chunk)
if len(data) > 1:
pair = data[-2] + data[-1]
if END in pair:
data[-2] = pair[:pair.index(END)]
data.pop()
break
log.debug("recv: %r", "".join(data))
return "".join(data)
class Message(object):
def __init__(self, filename):
msg = open(filename, "r")
try:
self.data = data = msg.read()
self.size = len(data)
self.top, bot = data.split("\r\n\r\n", 1)
self.bot = bot.split("\r\n")
finally:
msg.close()
def handleUser(data, msg):
return "+OK user accepted"
def handlePass(data, msg):
return "+OK pass accepted"
def handleStat(data, msg):
return "+OK 1 %i" % msg.size
def handleList(data, msg):
return "+OK 1 messages (%i octets)\r\n1 %i\r\n." % (msg.size, msg.size)
def handleTop(data, msg):
cmd, num, lines = data.split()
assert num == "1", "unknown message number: %s" % num
lines = int(lines)
text = msg.top + "\r\n\r\n" + "\r\n".join(msg.bot[:lines])
return "+OK top of message follows\r\n%s\r\n." % text
def handleRetr(data, msg):
log.info("message sent")
return "+OK %i octets\r\n%s\r\n." % (msg.size, msg.data)
def handleDele(data, msg):
return "+OK message 1 deleted"
def handleNoop(data, msg):
return "+OK"
def handleQuit(data, msg):
return "+OK pypopper POP3 server signing off"
dispatch = dict(
USER=handleUser,
PASS=handlePass,
STAT=handleStat,
LIST=handleList,
TOP=handleTop,
RETR=handleRetr,
DELE=handleDele,
NOOP=handleNoop,
QUIT=handleQuit,
)
def serve(host, port, filename):
assert os.path.exists(filename)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
try:
if host:
hostname = host
else:
hostname = "localhost"
log.info("pypopper POP3 serving '%s' on %s:%s", filename, hostname, port)
while True:
sock.listen(1)
conn, addr = sock.accept()
log.debug('Connected by %s', addr)
try:
msg = Message(filename)
conn = ChatterboxConnection(conn)
conn.sendall("+OK pypopper file-based pop3 server ready")
while True:
data = conn.recvall()
command = data.split(None, 1)[0]
try:
cmd = dispatch[command]
except KeyError:
conn.sendall("-ERR unknown command")
else:
conn.sendall(cmd(data, msg))
if cmd is handleQuit:
break
finally:
conn.close()
msg = None
except (SystemExit, KeyboardInterrupt):
log.info("pypopper stopped")
except Exception, ex:
log.critical("fatal error", exc_info=ex)
finally:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
if __name__ == "__main__":
if len(sys.argv) != 3:
print "USAGE: [<host>:]<port> <path_to_message_file>"
else:
_, port, filename = sys.argv
if ":" in port:
host = port[:port.index(":")]
port = port[port.index(":") + 1:]
else:
host = ""
try:
port = int(port)
except Exception:
print "Unknown port:", port
else:
if os.path.exists(filename):
serve(host, port, filename)
else:
print "File not found:", filename
|
Save the above code in a file named pypopper.py. Then run it with two arguments (server port and file name) something like this:
python pypopper.py 110 message.txt
Which would start the server on localhost port 110 and serve the contents of message.txt every time the client requests new messages. Please note, this server SHOULD NOT BE USED IN PRODUCTION! It is not secure, it is only meant for testing and learning about POP3. It implements a minimal subset of the POP3 protocol; just enough to test ThunderBayes.
On Windows, add 'rb' at line 49: msg = open(filename, "r") should be msg = open(filename, "rb")
Valery.