A class for maintaining a cache of limited size, with the most recently used versions being kept. This class is designed to operate in nearly constant time as the size of the cache varies.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | from UserDict import DictMixin
import re
class InternalLogicError(RuntimeError):
def __init__(self, msg):
self.value = msg
def __str__(self):
return repr(self.value)
class Node:
def __init__(self, key, value):
self.prior = None
self.next = None
self.key = key
self.value = value
def elide(self):
"Remove a node from the doubly linked list. (It remains in the dict.)"
if self.prior == None:
if self.next == None: return
if self.next.prior == self:
self.next.prior = None
self.next = None
return
raise InternalLogicError("self.next.prior != self and != None")
if self.next == None:
if self.prior.next == self:
self.prior.next = None
self.prior = None
return
raise InternalLogicError("self.prior.next != self and != None")
self.prior.next = self.next
self.next.prior = self.prior
self.next = None
self.prior = None
def addHead(self, first):
" add this node to the head of the list (i.e., before first, and with prior == None "
if first is None:
# empty list
self.next = None
self.prior = None
return
self.next = first
first.prior = self
self.prior = None
def delTail(self):
" delete this node from the tail of the list "
if self.next is not None:
raise InternalLogicError \
("self.next != None, but it must be at the tail of the list")
self.prior.next = None
def __str__(self):
return "Node(%s, %s)" % (str(self.key), str(self.value))
def portOut(self):
" Node position on list will be forgotten "
return "<Node(%s, %s)>" % (repr(self.key), repr(self.value))
class LRUCache(DictMixin):
"""
A class for maintaining a cache of limited size, with the most recently used versions
being kept. This class is designed to operate in nearly constant time as the size
of the cache varies.
Warning: This has not been tested much beyond the test sample included at the end.
"""
def __init__(self, maxSize = 255, chunk = 1, nodeDict = None):
if chunk >= maxSize:
raise ValueError ("invalid parameters. chunk must be smaller than maxSize")
self._maxSize = maxSize # how large is the cache allowed to grow
self._chunk = chunk # how many should be freed from cache in one cycle
self._basket = {}
self._first = None # head of the access sequence list
self._last = None # tail of the access sequence list
if nodeDict is not None:
for key in nodeDict.keys():
node = Node(key, nodeDict[key])
if self._first is None:
self._first = node
self._last = node
else:
node.addHead(self._first)
self._first = node
self._basket[key] = node
def __getitem__(self, key):
if key in self._basket:
node = self._basket[key]
if (len(node) < 2):
raise InternalLogicError("Invalid basket item length")
try:
ndx = self._list.index(node)
del self._list[ndx]
except KeyError, e:
raise InternalLogicError(e)
self._list[:0]= [node]
return node[1]
return None
def __setitem__(self, key, value):
if key is None: return
if self._first is None:
self._first = Node(key, value)
self._last = self._first
self._basket[key] = self._first
return
if len(self._basket) > self._maxSize:
while (len(self._basket) >= self._maxSize - self._chunk):
l = self._last
self._last = self._last.prior
l.delTail()
del self._basket[l.key]
if key in self._basket:
node = self._basket[key]
if node is self._first: return # already at head
node.elide() # remove node from within the list
node.addHead(self._first) # and move it to the head
self._first = node
return
else: # new item
node = Node (key, value)
node.addHead(self._first)
self._first = node
self._basket[key] = node
return
def __delitem__(self, key):
if key not in self._basket: return
node = self._basket[key]
node.elide()
del self._basket[key]
def __contains__(self, key):
return key in self._basket
def __iter__(self):
return self._basket.iterkeys()
def __repr__(self):
ret = "LRUCache(%s, %s" % (self._maxSize, self._chunk)
if len(self._basket) < 1:
return ret + ")"
else:
r = []
for key in self._basket.keys():
node = self._basket[key]
r.append("%s : %s" % (repr(node.key), repr(node.value)))
return ret + ", {" + ", ".join(r) + "})"
def iteritems(self):
" Warning: This is not counted as an access by the cache "
for key in self._basket.keys():
if key is None: continue
node = self._basket[key]
yield node.key, node.value
def keys(self):
return self._basket.keys()
def portOut(self):
ret = "<LRUCache(%s, %s) :: " % (self._maxSize, self._chunk)
f = True
r = []
for key in self._basket.keys():
node = self._basket[key]
r.append ("%s : %s" % (repr(node.key), repr(node.value)))
return (ret + (", ".join(r)) + " >")
@classmethod
def portIn(cls, defStr):
""" defStr should have been produced by portOut.
Warning: You *must* be able to trust the source of this string!!
Warning: On porting in the order of most recent access will randomized.
"""
if not ((defStr[0] == "<") and (defStr[-1] == ">")):
raise ValueError ("invalid parameter. Malformed class description.")
ds = defStr[1:-2].strip()
i = ds.find("::")
if (i < 0):
itm = eval(ds)
return itm
else:
ds0 = ds[0:i]
itm = eval(ds0)
ds = ds[i + 2:].strip()
vals = eval("{" + ds + "}")
for key in vals.keys():
node = Node(key, vals[key])
node.addHead(itm._first)
itm._basket[key] = node
return itm
if __name__ == "__main__":
lru = LRUCache(20, 7)
for i in range(90):
lru[str(i)] = i * i + 0.1
print "lru = ", lru
del lru[78]
lru[10] = 100
print "lru mod = ", lru
d = lru.portOut()
print "portOut = ", d
lru2 = LRUCache.portIn(d)
print "lru2 = ", lru2
print repr(lru2)
lru3 = eval(repr(lru2) )
print "lru3 = ", lru3
|
This is an attempt to make an LRUCache that operates in constant time as the number of cases increases. It should be successful to the extent that Python dicts operate in constant time. (I.e., not completely, but close.) It allows one to hold in ram a limited number of cases that are either recent or commonly used. It should usually have a backing store that allows slower access to data which has been silently discarded as the cache overflowed.
There are a three known problems with this implementation: 1) Unfortunately, this required implementing a doubly-linked list in Python. This may degrade the speed, but it avoids converting heaps back and forth between lists except for the numerous references to the keys() method of the dict class. 2) It should be possible to iterate through hash tables without the use of the references to dict.keys(), but I haven't fixed this as LRUCaches aren't intended for iterating through or storing. 3) This code has seen only a very limited amount of testing.
Two comments:
Your source seems to have broken indentation.
Was already posted (originally almost 4 years ago now): http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/252524