Welcome, guest | Sign In | My Account | Store | Cart

This little class starts up an SMTP server which acts as an email sink, collecting all received emails destined for any address. All emails are routed to a Portable Unix Mailbox file. This is very handy for testing applications that send email. It runs in its own thread, so you can easily use it from a test fixture to collect your emails and verify them for correctness.

Python, 84 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# SmtpMailsink Copyright 2005 Aviarc Corporation
# Written by Adam Feuer, Matt Branthwaite, and Troy Frever

import sys, asyncore, threading, socket, smtpd, time, StringIO

class SmtpMailsinkServer(smtpd.SMTPServer):
    __version__ = 'Python SMTP Mail Sink version 0.1'

    def __init__( self, *args, **kwargs):
        smtpd.SMTPServer.__init__( self, *args, **kwargs )
        self.mailboxFile = None

    def setMailsinkFile( self, mailboxFile ):
        self.mailboxFile = mailboxFile
        
    def process_message(self, peer, mailfrom, rcpttos, data):
        if self.mailboxFile is not None:
            self.mailboxFile.write( "From %s\n" % mailfrom )
            self.mailboxFile.write( data )
            self.mailboxFile.write( "\n\n" )
            self.mailboxFile.flush()

class SmtpMailsink( threading.Thread ):
    TIME_TO_WAIT_BETWEEN_CHECKS_TO_STOP_SERVING = 0.001

    def __init__( self, host = "localhost", port = 8025, mailboxFile = None, threadName = None ):   
        self.throwExceptionIfAddressIsInUse( host, port )
        self.initializeThread( threadName )
        self.initializeSmtpMailsinkServer( host, port, mailboxFile )

    def throwExceptionIfAddressIsInUse( self, host, port ):
        testSocket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
        testSocket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR,
                               testSocket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 )
        testSocket.bind( ( host, port ) )
        testSocket.close()

    def initializeThread( self, threadName ):
        self._stopevent = threading.Event()
        self.threadName = threadName
        if self.threadName is None:
            self.threadName = SmtpMailsink.__class__
        threading.Thread.__init__( self, name = self.threadName )
        
    def initializeSmtpMailsinkServer( self, host, port, mailboxFile ):
        self.smtpMailsinkServer = SmtpMailsinkServer( ( host, port ), None )
        self.resetMailbox( mailboxFile )
        smtpd.__version__ = SmtpMailsinkServer.__version__ 
                
    def resetMailbox( self, mailboxFile = None ):
        self.mailboxFile = mailboxFile
        if self.mailboxFile is None:
            self.mailboxFile = StringIO.StringIO()
        self.smtpMailsinkServer.setMailsinkFile( self.mailboxFile )

    def getMailboxContents( self ):
        return self.mailboxFile.getvalue()
    
    def getMailboxFile( self ):
        return self.mailboxFile
    
    def run( self ):
        while not self._stopevent.isSet():
            asyncore.loop( timeout = SmtpMailsink.TIME_TO_WAIT_BETWEEN_CHECKS_TO_STOP_SERVING, count = 1 )

    def stop( self, timeout=None ):
        self._stopevent.set()
        threading.Thread.join( self, timeout )
        self.smtpMailsinkServer.close()
        
if __name__ == "__main__":
    if len( sys.argv ) < 2 or len( sys.argv ) > 3:
        print "Usage: python SmtpMailsink.py mailsinkfile [hostname]"
        sys.exit( 1 )
    mailfile = sys.argv[1]
    hostname = "localhost"
    if len( sys.argv ) > 2:
        hostname = sys.argv[ 2 ]
    fileobject = open( mailfile, "w" )
    smtpMailsink = SmtpMailsink( host = hostname, mailboxFile = fileobject )
    smtpMailsink.start()
    while True:

        time.sleep( 1 )

SmtpMailsinkServer inherits from Python 2.4's smtpd.SMTPServer class. SmtpMailsink is a simple driver class for controlling the server. Mail messages are written to a Portable Unix Mailbox file, which is easy to parse using Python's email and mailbox modules, as shown in the example below.

The __main__ portion of the code starts the mailsink and runs forever - use ctrl-c to quit. This is handy for manual testing. For automated tests, just use the SmtpMailsink class directly (see example).

Note that Python's smtpd.SMTPServer class uses a module variable called "__version__" which gets printed as the server's greeting. SmtpMailsink sets __version__ to a different value, which has the side-effect of setting it for all users in this process. A class level variable in the smtpd.SMTPServer base class would fix this issue.

Requires Python 2.4 (or 2.3 + smtpd.py + asyncore.py from Python 2.4).

Adam Feuer, Matt Branthwaite, Troy Frever Aviarc Corporation http://www.aviarc.com/

Example of using the SmtpMailsink class:

<pre> import email, mailbox, smtplib, StringIO from SmtpMailsink import SmtpMailsink

if __name__ == '__main__':

smtpMailsink = SmtpMailsink()
smtpMailsink.start()

fromAddress = "someone@fromAddress.com"
toAddress = "nobody@toAddress.com"
message = "From: %s\r\nTo: %s\r\n\r\n" % ( fromAddress, toAddress )
message += "This is a test message."
smtpSender = smtplib.SMTP( 'localhost', 8025 )
smtpSender.sendmail( fromAddress, toAddress, message )
smtpSender.quit()

smtpMailsink.stop()

mailboxFile2 =  StringIO.StringIO( smtpMailsink.getMailboxContents() )
mailboxObject = mailbox.PortableUnixMailbox( mailboxFile2, email.message_from_file )
for messageText in [ message.as_string() for message in mailboxObject ]:
    print "Header:"
    print "To: %s\nFrom: %s\n" % ( message.get( "To" ), message.get( "From" ) )
    print "Message Text:"
    print messageText

</pre>

2 comments

Hans-Peter Jansen 16 years, 1 month ago  # | flag

From escaping? Nice tool, but shouldn't it escape lines starting with 'From ' at least in the data section when using a mbox style file backend?

Pseudo-diff:

-         self.mailboxFile.write( data )
+         self.mailboxFile.write( data.replace('\nFrom ', '\n>From ' )

Pete

Radek Svarz 16 years, 1 month ago  # | flag

license for frameworks? wow, great, exactly what I was looking for.

btw. this could be integrated in some web frameworks. however I wonder what is the license of it. Could we use it in the BSD like licensed frameworks?