Welcome, guest | Sign In | My Account | Store | Cart

A class for maintaining a cache of limited size, with the most recently used versions being kept. This class is designed to operate in nearly constant time as the size of the cache varies.

Python, 218 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from UserDict import DictMixin
import   re

class InternalLogicError(RuntimeError):
   def   __init__(self, msg):
   self.value  =  msg

   def   __str__(self):
      return   repr(self.value)


class Node:
   def   __init__(self, key, value):
   self.prior  =  None
   self.next   =  None
   self.key =  key
   self.value  =  value

   def   elide(self):
   "Remove a node from the doubly linked list.  (It remains in the dict.)"
   if self.prior == None:
      if self.next   == None: return
      if self.next.prior ==   self:
      self.next.prior   =  None
      self.next      =  None
      return
      raise InternalLogicError("self.next.prior != self and != None")
   if self.next   == None:
      if self.prior.next == self:
      self.prior.next   =  None
      self.prior     =  None
      return
      raise InternalLogicError("self.prior.next != self and != None")
   self.prior.next   =  self.next
   self.next.prior   =  self.prior
   self.next   =  None
   self.prior  =  None

   def   addHead(self, first):
   "  add this node to the head of the list (i.e., before first, and with prior == None   "
   if first is None:
      #  empty list
      self.next   =  None
      self.prior  =  None
      return
   self.next   =  first
   first.prior =  self
   self.prior  =  None

   def   delTail(self):
   "  delete this node from the tail of the list   "
   if self.next   is not   None:
      raise InternalLogicError   \
         ("self.next != None, but it must be at the tail of the list")
   self.prior.next   =  None

   def   __str__(self):
   return   "Node(%s, %s)" % (str(self.key), str(self.value))
   
   def   portOut(self):
   "  Node position on list will be forgotten   "
   return   "<Node(%s, %s)>" % (repr(self.key), repr(self.value))


class LRUCache(DictMixin):
   """
   A class for maintaining a cache of limited size, with the most recently used versions
   being kept.  This class is designed to operate in nearly constant time as the size
   of the cache varies.
   Warning:  This has not been tested much beyond the test sample included at the end.
   """
   def   __init__(self, maxSize = 255, chunk = 1, nodeDict = None):
   if chunk >= maxSize:
      raise ValueError ("invalid parameters.  chunk must be smaller than maxSize")
   self._maxSize  =  maxSize  #  how large is the cache allowed to grow
   self._chunk =  chunk    #  how many should be freed from cache in one cycle
   self._basket   =  {}
   self._first =  None  #  head of the access sequence list
   self._last  =  None  #  tail of the access sequence list

   if nodeDict is not None:
      for   key   in nodeDict.keys():
      node  =  Node(key, nodeDict[key])
      if self._first is None:
         self._first =  node
         self._last  =  node
      else:
         node.addHead(self._first)
         self._first =  node
      self._basket[key] =  node

   def   __getitem__(self, key):
   if key in self._basket:
      node  =  self._basket[key]
      if (len(node) < 2):
      raise InternalLogicError("Invalid basket item length")
      try:
      ndx   =  self._list.index(node)
      del   self._list[ndx]
      except   KeyError, e:
      raise InternalLogicError(e)
      self._list[:0]= [node]
      return   node[1]
   return   None

   def   __setitem__(self, key, value):
   if key is None:   return
   if self._first is None:
      self._first =  Node(key, value)
      self._last  =  self._first
      self._basket[key] =  self._first
      return

   if len(self._basket) > self._maxSize:
      while (len(self._basket) >= self._maxSize - self._chunk):
      l  =  self._last
      self._last  =  self._last.prior
      l.delTail()
      del   self._basket[l.key]

   if key in self._basket:
      node  =  self._basket[key]
      if node is self._first: return   #  already at head
      node.elide()            #  remove node from within the list
      node.addHead(self._first)     #  and move it to the head
      self._first =  node
      return
   else: #  new item
      node  =  Node (key, value)
      node.addHead(self._first)
      self._first =  node
      self._basket[key] = node
      return

   def   __delitem__(self, key):
   if key not in  self._basket:  return
   node  =  self._basket[key]
   node.elide()
   del   self._basket[key]


   def   __contains__(self, key):
   return   key in self._basket

   def   __iter__(self):
   return   self._basket.iterkeys()

   def   __repr__(self):
   ret   =  "LRUCache(%s, %s" % (self._maxSize, self._chunk)
   if len(self._basket) < 1:
      return   ret + ")"
   else:
      r  =  []
      for   key   in self._basket.keys():
      node  =  self._basket[key]
      r.append("%s : %s" % (repr(node.key), repr(node.value)))
      return   ret + ", {" + ", ".join(r) + "})"

   def   iteritems(self):
   "  Warning:  This is not counted as an access by the cache  "
   for   key   in self._basket.keys():
      if key is None:   continue
      node  =  self._basket[key]
      yield node.key, node.value

   def   keys(self):
   return   self._basket.keys()

   def   portOut(self):
   ret   =  "<LRUCache(%s, %s) :: " % (self._maxSize, self._chunk)
   f  =  True
   r  =  []
   for   key   in self._basket.keys():
      node  =  self._basket[key]
      r.append ("%s : %s" % (repr(node.key), repr(node.value)))
   return   (ret + (", ".join(r)) + " >")


   @classmethod
   def   portIn(cls, defStr):
   """   defStr should have been produced by portOut.
      Warning:  You *must* be able to trust the source of this string!!
      Warning:  On porting in the order of most recent access will randomized.
   """
   if not ((defStr[0] == "<") and (defStr[-1] == ">")):
      raise ValueError ("invalid parameter.  Malformed class description.")
   ds =  defStr[1:-2].strip()
   i  =  ds.find("::")
   if (i < 0):
      itm   =  eval(ds)
      return   itm
   else:
      ds0   =  ds[0:i]
      itm   =  eval(ds0)
      ds =  ds[i + 2:].strip()
      vals  =  eval("{" + ds + "}")
      for   key   in vals.keys():
      node  =  Node(key, vals[key])
      node.addHead(itm._first)
      itm._basket[key]  =  node
      return   itm

if __name__ == "__main__":
   lru   =  LRUCache(20, 7)
   for   i  in range(90):
   lru[str(i)] = i * i + 0.1

   print "lru = ", lru
   del   lru[78]
   lru[10]  =  100
   print "lru mod = ", lru
   d  =  lru.portOut()
   print "portOut = ", d
   lru2  =  LRUCache.portIn(d)
   print "lru2 = ", lru2
   print repr(lru2)
   lru3  =  eval(repr(lru2) )
   print "lru3 = ", lru3
   

This is an attempt to make an LRUCache that operates in constant time as the number of cases increases. It should be successful to the extent that Python dicts operate in constant time. (I.e., not completely, but close.) It allows one to hold in ram a limited number of cases that are either recent or commonly used. It should usually have a backing store that allows slower access to data which has been silently discarded as the cache overflowed.

There are a three known problems with this implementation: 1) Unfortunately, this required implementing a doubly-linked list in Python. This may degrade the speed, but it avoids converting heaps back and forth between lists except for the numerous references to the keys() method of the dict class. 2) It should be possible to iterate through hash tables without the use of the references to dict.keys(), but I haven't fixed this as LRUCaches aren't intended for iterating through or storing. 3) This code has seen only a very limited amount of testing.

1 comment

Josiah Carlson 16 years, 10 months ago  # | flag

Two comments:

  1. Your source seems to have broken indentation.

  2. Was already posted (originally almost 4 years ago now): http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/252524

Created by Charles Hixson on Mon, 28 May 2007 (PSF)
Python recipes (4591)
Charles Hixson's recipes (2)

Required Modules

  • (none specified)

Other Information and Tasks