import os, sys, marshal, glob, thread # Filename used for index files, must not contain numbers INDEX_FILENAME = 'index' # Exception thrown when calling get() on an empty queue class Empty(Exception): pass class PersistentQueue: def __init__(self, name, cache_size=512, marshal=marshal): """ Create a persistent FIFO queue named by the 'name' argument. The number of cached queue items at the head and tail of the queue is determined by the optional 'cache_size' parameter. By default the marshal module is used to (de)serialize queue items, but you may specify an alternative serialize module/instance with the optional 'marshal' argument (e.g. pickle). """ assert cache_size > 0, 'Cache size must be larger than 0' self.name = name self.cache_size = cache_size self.marshal = marshal self.index_file = os.path.join(name, INDEX_FILENAME) self.temp_file = os.path.join(name, 'tempfile') self.mutex = thread.allocate_lock() self._init_index() def _init_index(self): if not os.path.exists(self.name): os.mkdir(self.name) if os.path.exists(self.index_file): index_file = open(self.index_file) self.head, self.tail = map(lambda x: int(x), index_file.read().split(' ')) index_file.close() else: self.head, self.tail = 0, 1 def _load_cache(cache, num): name = os.path.join(self.name, str(num)) mode = 'rb+' if os.path.exists(name) else 'wb+' cachefile = open(name, mode) try: setattr(self, cache, self.marshal.load(cachefile)) except EOFError: setattr(self, cache, []) cachefile.close() _load_cache('put_cache', self.tail) _load_cache('get_cache', self.head) assert self.head < self.tail, 'Head not less than tail' def _sync_index(self): assert self.head < self.tail, 'Head not less than tail' index_file = open(self.temp_file, 'w') index_file.write('%d %d' % (self.head, self.tail)) index_file.close() if os.path.exists(self.index_file): os.remove(self.index_file) os.rename(self.temp_file, self.index_file) def _split(self): put_file = os.path.join(self.name, str(self.tail)) temp_file = open(self.temp_file, 'wb') self.marshal.dump(self.put_cache, temp_file) temp_file.close() if os.path.exists(put_file): os.remove(put_file) os.rename(self.temp_file, put_file) self.tail += 1 if len(self.put_cache) <= self.cache_size: self.put_cache = [] else: self.put_cache = self.put_cache[:self.cache_size] self._sync_index() def _join(self): current = self.head + 1 if current == self.tail: self.get_cache = self.put_cache self.put_cache = [] else: get_file = open(os.path.join(self.name, str(current)), 'rb') self.get_cache = self.marshal.load(get_file) get_file.close() try: os.remove(os.path.join(self.name, str(self.head))) except: pass self.head = current if self.head == self.tail: self.head = self.tail - 1 self._sync_index() def _sync(self): self._sync_index() get_file = os.path.join(self.name, str(self.head)) temp_file = open(self.temp_file, 'wb') self.marshal.dump(self.get_cache, temp_file) temp_file.close() if os.path.exists(get_file): os.remove(get_file) os.rename(self.temp_file, get_file) put_file = os.path.join(self.name, str(self.tail)) temp_file = open(self.temp_file, 'wb') self.marshal.dump(self.put_cache, temp_file) temp_file.close() if os.path.exists(put_file): os.remove(put_file) os.rename(self.temp_file, put_file) def __len__(self): """ Return number of items in queue. """ self.mutex.acquire() try: return (((self.tail-self.head)-1)*self.cache_size) + \ len(self.put_cache) + len(self.get_cache) finally: self.mutex.release() def sync(self): """ Synchronize memory caches to disk. """ self.mutex.acquire() try: self._sync() finally: self.mutex.release() def put(self, obj): """ Put the item 'obj' on the queue. """ self.mutex.acquire() try: self.put_cache.append(obj) if len(self.put_cache) >= self.cache_size: self._split() finally: self.mutex.release() def get(self): """ Get an item from the queue. Throws Empty exception if the queue is empty. """ self.mutex.acquire() try: if len(self.get_cache) > 0: return self.get_cache.pop(0) else: self._join() if len(self.get_cache) > 0: return self.get_cache.pop(0) else: raise Empty finally: self.mutex.release() def close(self): """ Close the queue. Implicitly synchronizes memory caches to disk. No further accesses should be made through this queue instance. """ self.mutex.acquire() try: self._sync() if os.path.exists(self.temp_file): try: os.remove(self.temp_file) except: pass finally: self.mutex.release() ## Tests if __name__ == "__main__": ELEMENTS = 1000 p = PersistentQueue('test', 10) print 'Enqueueing %d items, cache size = %d' % (ELEMENTS, p.cache_size) for a in range(ELEMENTS): p.put(str(a)) p.sync() print 'Queue length (using __len__):', len(p) print 'Dequeueing %d items' % (ELEMENTS/2) for a in range(ELEMENTS/2): p.get() print 'Queue length (using __len__):', len(p) print 'Dequeueing %d items' % (ELEMENTS/2) for a in range(ELEMENTS/2): p.get() print 'Queue length (using __len__):', len(p) p.sync() p.close()