Welcome, guest | Sign In | My Account | Store | Cart
from twisted.internet import reactor, protocol, defer
from twisted.web import client
import feedparser, time, sys

# You can get this file from http://xoomer.virgilio.it/dialtone/out.py
# Since it's only a list of URLs made in this way:
# out = [  ( 'URL', 'EXTRA_INFOS'),
#          ( 'URL', 'EXTRA_INFOS')
#          ...
#        ]
import out

    import cStringIO as _StringIO
except ImportError:
    import StringIO as _StringIO

rss_feeds = out.rss_feed # This is the HUGE feed list  (730 feeds)

DEFERRED_GROUPS = 60  # Number of simultaneous connections
INTER_QUERY_TIME = 300 # Max Age (in seconds) of each feed in the cache
TIMEOUT = 30 # Timeout in seconds for the web request

# This dict structure will be the following:
# { 'URL': (TIMESTAMP, value) }
cache = {}

class FeederProtocol(object):
    def __init__(self):
        self.parsed = 1
        self.with_errors = 0
        self.error_list = []
    def isCached(self, site):
        # Try to get the tuple (TIMESTAMP, FEED_STRUCT) from the dict if it has
        # already been downloaded. Otherwise assign None to already_got
        already_got = cache.get(site[0], None)

        # Ok guys, we got it cached, let's see what we will do
        if already_got:
            # Well, it's cached, but will it be recent enough?
            elapsed_time = time.time() - already_got[0]
            # Woooohooo it is, elapsed_time is less than INTER_QUERY_TIME so I
            # can get the page from the memory, recent enough
            if elapsed_time < INTER_QUERY_TIME:
                return True
                # Uhmmm... actually it's a bit old, I'm going to get it from the
                # Net then, then I'll parse it and then I'll try to memoize it
                # again
                return False
            # Well... We hadn't it cached in, so we need to get it from the Net
            # now, It's useless to check if it's recent enough, it's not there.
            return False

    def gotError(self, traceback, extra_args):
        # An Error as occurred, print traceback infos and go on
        print traceback, extra_args
        self.with_errors += 1
        print "="*20
        print "Trying to go on..."
    def getPageFromMemory(self, data, key=None):
        # Getting the second element of the tuple which is the parsed structure
        # of the feed at address key, the first element of the tuple is the
        # timestamp
        print "Getting from memory..."
        return defer.succeed(cache.get(key,key)[1])

    def parseFeed(self, feed):
        # This is self explaining :)
        print "parsing..."
            parsed = feedparser.parse(_StringIO.StringIO(feed))
        except TypeError:
            parsed = feedparser.parse(_StringIO.StringIO(str(feed)))
        print "parsed feed"
        return parsed
    def memoize(self, feed, addr):
        # feed is the raw structure, just as returned from feedparser.parse()
        # while addr is the address from which the feed was got.
        print "Memoizing",addr,"..."
        if cache.get(addr, None):
            cache[addr] = (time.time(), feed)
            cache.setdefault(addr, (time.time(),feed))
        return feed
    def workOnPage(self, parsed_feed, addr):
        # As usual, addr is the feed address and file is the file in
        # which you can eventually save the structure.
        print "-"*20
        print "finished retrieving"
        print "Feed Version:",parsed_feed.get('version','Unknown')
        #  Uncomment the following if you want to print the feeds
        chan = parsed_feed.get('channel', None)
        if chan:
            print chan.get('title', '')
            #print chan.get('link', '')
            #print chan.get('tagline', '')
            #print chan.get('description','')
        print "-"*20
        #items = parsed_feed.get('items', None)
        #if items:
        #    for item in items:
        #        print '\tTitle: ', item.get('title','')
        #        print '\tDate: ', item.get('date', '')
        #        print '\tLink: ', item.get('link', '')
        #        print '\tDescription: ', item.get('description', '')
        #        print '\tSummary: ', item.get('summary','')
        #        print "-"*20
        #print "got",addr
        #print "="*40
        return parsed_feed
    def stopWorking(self, data=None):
        print "Closing connection number %d..."%(self.parsed,)
        print "=-"*20
        # This is here only for testing. When a protocol/interface will be
        # created to communicate with this rss-aggregator server, we won't need
        # to die after we parsed some feeds just one time.
        self.parsed += 1
        print self.parsed,  self.END_VALUE
        if self.parsed > self.END_VALUE:   #
            print "Closing all..."         #
            for i in self.error_list:      #  Just for testing sake
                print i                    #
            print len(self.error_list)     #
            reactor.stop()                 #

    def getPage(self, data, args):
        return client.getPage(args,timeout=TIMEOUT)

    def printStatus(self, data=None):
        print "Starting feed group..."
    def start(self, data=None, std_alone=True):
        d = defer.succeed(self.printStatus())
        for feed in data:
            # Now we start telling the reactor that it has
            # to get all the feeds one by one...
            cached = self.isCached(feed)
            if not cached: 
                # When the feed is not cached, it's time to
                # go and get it from the web directly
                d.addCallback(self.getPage, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting'))
                # Parse the feed and if there's some errors call self.gotError
                d.addErrback(self.gotError, (feed[0], 'parsing'))
                # Now memoize it, if there's some error call self.getError
                d.addCallback(self.memoize, feed[0])
                d.addErrback(self.gotError, (feed[0], 'memoizing'))
            else: # If it's cached
                d.addCallback(self.getPageFromMemory, feed[0])
                d.addErrback(self.gotError, (feed[0], 'getting from memory'))
            # When you get the raw structure you can work on it
            # to format in the best way you can think of.
            # For any error call self.gotError.
            d.addCallback(self.workOnPage, feed[0])
            d.addErrback(self.gotError, (feed[0], 'working on page'))
            # And when the for loop is ended we put 
            # stopWorking on the callback for the last 
            # feed gathered
            # This is only for testing purposes
            if std_alone:
                d.addErrback(self.gotError, (feed[0], 'while stopping'))
        if not std_alone:
            return d

class FeederFactory(protocol.ClientFactory):
    protocol = FeederProtocol()
    def __init__(self, std_alone=False):
        self.feeds = self.getFeeds()
        self.std_alone = std_alone
        self.protocol.factory = self
        self.protocol.END_VALUE = len(self.feeds) # this is just for testing

        if std_alone:

    def start(self, addresses):
        # Divide into groups all the feeds to download
        if len(addresses) > DEFERRED_GROUPS:
            url_groups = [[] for x in xrange(DEFERRED_GROUPS)]
            for i, addr in enumerate(addresses):
            url_groups = [[addr] for addr in addresses]
        for group in url_groups:
            if not self.std_alone:
                return self.protocol.start(group, self.std_alone)
                self.protocol.start(group, self.std_alone)

    def getFeeds(self, where=None):
        # This is used when you call a COMPLETE refresh of the feeds,
        # or for testing purposes
        #print "getting feeds"
        # This is to get the feeds we want
        if not where: # We don't have a database, then we use the local
                      # variabile rss_feeds
            return rss_feeds
        else: return None
if __name__=="__main__":
    f = FeederFactory(std_alone=True)


  • revision 2 (19 years ago)
  • previous revisions are not available