Welcome, guest | Sign In | My Account | Store | Cart
NOTE: Recipes have moved! Please visit GitHub.com/activestate/code for the current versions.

This is a fully featured Rss aggregator with parsing included.

It's scalable too very high numbers of feeds and can be used in multi-client environment through web using twisted with a little code on top of Nevow (www.nevow.com), or can easily be integrated inside every app which uses some of the toolkits supported by Twisted.

Python, 228 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from twisted.internet import reactor, protocol, defer
from twisted.web import client
import feedparser, time, sys

# You can get this file from http://xoomer.virgilio.it/dialtone/out.py
# Since it's only a list of URLs made in this way:
# out = [  ( 'URL', 'EXTRA_INFOS'),
#          ( 'URL', 'EXTRA_INFOS')
#          ...
#        ]
import out

try:
    import cStringIO as _StringIO
except ImportError:
    import StringIO as _StringIO

rss_feeds = out.rss_feed # This is the HUGE feed list  (730 feeds)

DEFERRED_GROUPS = 60  # Number of simultaneous connections
INTER_QUERY_TIME = 300 # Max Age (in seconds) of each feed in the cache
TIMEOUT = 30 # Timeout in seconds for the web request

# This dict structure will be the following:
# { 'URL': (TIMESTAMP, value) }
cache = {}

class FeederProtocol(object):
    def __init__(self):
        self.parsed = 1
        self.with_errors = 0
        self.error_list = []
        
    def isCached(self, site):
        # Try to get the tuple (TIMESTAMP, FEED_STRUCT) from the dict if it has
        # already been downloaded. Otherwise assign None to already_got
        already_got = cache.get(site[0], None)

        # Ok guys, we got it cached, let's see what we will do
        if already_got:
            # Well, it's cached, but will it be recent enough?
            elapsed_time = time.time() - already_got[0]
            
            # Woooohooo it is, elapsed_time is less than INTER_QUERY_TIME so I
            # can get the page from the memory, recent enough
            if elapsed_time < INTER_QUERY_TIME:
                return True
            
            else:    
                # Uhmmm... actually it's a bit old, I'm going to get it from the
                # Net then, then I'll parse it and then I'll try to memoize it
                # again
                return False
            
        else: 
            # Well... We hadn't it cached in, so we need to get it from the Net
            # now, It's useless to check if it's recent enough, it's not there.
            return False

    def gotError(self, traceback, extra_args):
        # An Error as occurred, print traceback infos and go on
        print traceback, extra_args
        self.with_errors += 1
        self.error_list.append(extra_args)
        print "="*20
        print "Trying to go on..."
        
    def getPageFromMemory(self, data, key=None):
        # Getting the second element of the tuple which is the parsed structure
        # of the feed at address key, the first element of the tuple is the
        # timestamp
        print "Getting from memory..."
        return defer.succeed(cache.get(key,key)[1])

    def parseFeed(self, feed):
        # This is self explaining :)
        print "parsing..."
        try:
            feed+''
            parsed = feedparser.parse(_StringIO.StringIO(feed))
        except TypeError:
            parsed = feedparser.parse(_StringIO.StringIO(str(feed)))
        print "parsed feed"
        return parsed
   
    def memoize(self, feed, addr):
        # feed is the raw structure, just as returned from feedparser.parse()
        # while addr is the address from which the feed was got.
        print "Memoizing",addr,"..."
        if cache.get(addr, None):
            cache[addr] = (time.time(), feed)
        else:
            cache.setdefault(addr, (time.time(),feed))
        return feed
    
    def workOnPage(self, parsed_feed, addr):
        # As usual, addr is the feed address and file is the file in
        # which you can eventually save the structure.
        print "-"*20
        print "finished retrieving"
        print "Feed Version:",parsed_feed.get('version','Unknown')
        
        #
        #  Uncomment the following if you want to print the feeds
        #
        chan = parsed_feed.get('channel', None)
        if chan:
            print chan.get('title', '')
            #print chan.get('link', '')
            #print chan.get('tagline', '')
            #print chan.get('description','')
        print "-"*20
        #items = parsed_feed.get('items', None)
        #if items:
        #    for item in items:
        #        print '\tTitle: ', item.get('title','')
        #        print '\tDate: ', item.get('date', '')
        #        print '\tLink: ', item.get('link', '')
        #        print '\tDescription: ', item.get('description', '')
        #        print '\tSummary: ', item.get('summary','')
        #        print "-"*20
        #print "got",addr
        #print "="*40
        return parsed_feed
        
    def stopWorking(self, data=None):
        print "Closing connection number %d..."%(self.parsed,)
        print "=-"*20
        
        # This is here only for testing. When a protocol/interface will be
        # created to communicate with this rss-aggregator server, we won't need
        # to die after we parsed some feeds just one time.
        self.parsed += 1
        print self.parsed,  self.END_VALUE
        if self.parsed > self.END_VALUE:   #
            print "Closing all..."         #
            for i in self.error_list:      #  Just for testing sake
                print i                    #
            print len(self.error_list)     #
            reactor.stop()                 #

    def getPage(self, data, args):
        return client.getPage(args,timeout=TIMEOUT)

    def printStatus(self, data=None):
        print "Starting feed group..."
            
    def start(self, data=None, std_alone=True):
        d = defer.succeed(self.printStatus())
        for feed in data:
        
            # Now we start telling the reactor that it has
            # to get all the feeds one by one...
            cached = self.isCached(feed)
            if not cached: 
                # When the feed is not cached, it's time to
                # go and get it from the web directly
                d.addCallback(self.getPage, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting'))
                
                # Parse the feed and if there's some errors call self.gotError
                d.addCallback(self.parseFeed)
                d.addErrback(self.gotError, (feed[0], 'parsing'))
                
                # Now memoize it, if there's some error call self.getError
                d.addCallback(self.memoize, feed[0])
                d.addErrback(self.gotError, (feed[0], 'memoizing'))
                
            else: # If it's cached
                d.addCallback(self.getPageFromMemory, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting from memory'))
            
            # When you get the raw structure you can work on it
            # to format in the best way you can think of.
            # For any error call self.gotError.
            d.addCallback(self.workOnPage, feed[0])
            d.addErrback(self.gotError, (feed[0], 'working on page'))
            
            # And when the for loop is ended we put 
            # stopWorking on the callback for the last 
            # feed gathered
            # This is only for testing purposes
            if std_alone:
                d.addCallback(self.stopWorking)
                d.addErrback(self.gotError, (feed[0], 'while stopping'))
        if not std_alone:
            return d

class FeederFactory(protocol.ClientFactory):
    protocol = FeederProtocol()
    def __init__(self, std_alone=False):
        self.feeds = self.getFeeds()
        self.std_alone = std_alone
        
        self.protocol.factory = self
        self.protocol.END_VALUE = len(self.feeds) # this is just for testing

        if std_alone:
            self.start(self.feeds)

    def start(self, addresses):
        # Divide into groups all the feeds to download
        if len(addresses) > DEFERRED_GROUPS:
            url_groups = [[] for x in xrange(DEFERRED_GROUPS)]
            for i, addr in enumerate(addresses):
                url_groups[i%DEFERRED_GROUPS].append(addr)
        else:
            url_groups = [[addr] for addr in addresses]
            
        for group in url_groups:
            if not self.std_alone:
                return self.protocol.start(group, self.std_alone)
            else:
                self.protocol.start(group, self.std_alone)

    def getFeeds(self, where=None):
        # This is used when you call a COMPLETE refresh of the feeds,
        # or for testing purposes
        #print "getting feeds"
        # This is to get the feeds we want
        if not where: # We don't have a database, then we use the local
                      # variabile rss_feeds
            return rss_feeds
        else: return None
        
if __name__=="__main__":
    f = FeederFactory(std_alone=True)
    reactor.run()

It's made on top of Twisted Matrix with the aid of Universal Feed Parser from Mark Pilgrim.

It has the following features: - Easily set the number of parallel connections to download the feeds - Easily set the timeout for each feed request - Easily set the max feed age (used for the memoizing feature)

From some tests I've been doing, it takes 6 minutes to download and parse over 730 feeds, which is less than half a second for each feed.

I've already submitted it to Straw developers who answered to help them in integrating this engine inside the new version of Straw rss reader.

I've also written a very simple web interface with Nevow (www.nevow.com), which is available in the complete package from http://xoomer.virgilio.it/dialtone/rss-aggregator-web-v0.1.tar.bz2

1 comment

Harry Fuecks 12 years, 2 months ago  # | flag

Conditional HTTP GETs. Technique described here: http://fishbowl.pastiche.org/2002/10/21/http_conditional_get_for_rss_hackers

Script found here http://www.phppatterns.com/docs/develop/twisted_aggregator - essential bit below;

class ConditionalHTTPPageGetter(HTTPPageGetter):

    def handleStatus_200(self):
        # If we're good, try recording the last-modified header
        if self.headers.has_key('last-modified'):
            self.factory.lastModified(self.headers['last-modified'])

    def handleStatus_304(self):
        # Close connection
        self.factory.notModified()
        self.transport.loseConnection()

class ConditionalHTTPClientFactory(HTTPClientFactory):

    protocol = ConditionalHTTPPageGetter

    def __init__(self, cacheDir, url, method='GET', postdata=None, headers=None,
                 agent="Twisted ConditionalPageGetter", timeout=0, cookies=None,
                 followRedirect=1):

        self.cachefile = cacheDir+os.path.sep+self.getHashForUrl(url)

        if os.path.exists(self.cachefile):

            lastModified = open(self.cachefile).readline().strip()

            if headers is not None:
                headers['last-modified'] = lastModified
            else:
                headers = {'last-modified': lastModified }


        HTTPClientFactory.__init__(self, url, method=method, postdata=postdata,
                headers=headers, agent=agent, timeout=timeout, cookies=cookies,
                followRedirect=followRedirect
                )

        self.waiting = 1
        self.deferred = defer.Deferred()


    def lastModified(self, modtime):
        f = open(self.cachefile,'w')
        f.write(modtime[0])

    def getHashForUrl(self, url):
        hash = md5.new()
        hash.update(url)
        return hash.hexdigest()

    def notModified():
        if self.waiting:
            self.waiting = 0

def conditionalGetPage(cacheDir, url, contextFactory=None, *args, **kwargs):
    scheme, host, port, path = client._parse(url)
    factory = ConditionalHTTPClientFactory(cacheDir, url, *args, **kwargs)
    if scheme == 'https':
        from twisted.internet import ssl
        if contextFactory is None:
            contextFactory = ssl.ClientContextFactory()
        reactor.connectSSL(host, port, factory, contextFactory)
    else:
        reactor.connectTCP(host, port, factory)
    return factory.deferred