import collections
import functools
from itertools import ifilterfalse
from heapq import nsmallest
from operator import itemgetter
class Counter(dict):
'Mapping where default values are zero'
def __missing__(self, key):
return 0
def lru_cache(maxsize=100):
'''Least-recently-used cache decorator.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
Clear the cache with f.clear().
http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
'''
maxqueue = maxsize * 10
def decorating_function(user_function,
len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError):
cache = {} # mapping of args to results
queue = collections.deque() # order that keys have been used
refcount = Counter() # times each key is in the queue
sentinel = object() # marker for looping around the queue
kwd_mark = object() # separate positional and keyword args
# lookup optimizations (ugly but fast)
queue_append, queue_popleft = queue.append, queue.popleft
queue_appendleft, queue_pop = queue.appendleft, queue.pop
@functools.wraps(user_function)
def wrapper(*args, **kwds):
# cache key records both positional and keyword args
key = args
if kwds:
key += (kwd_mark,) + tuple(sorted(kwds.items()))
# record recent use of this key
queue_append(key)
refcount[key] += 1
# get cache entry or compute if not found
try:
result = cache[key]
wrapper.hits += 1
except KeyError:
result = user_function(*args, **kwds)
cache[key] = result
wrapper.misses += 1
# purge least recently used cache entry
if len(cache) > maxsize:
key = queue_popleft()
refcount[key] -= 1
while refcount[key]:
key = queue_popleft()
refcount[key] -= 1
del cache[key], refcount[key]
# periodically compact the queue by eliminating duplicate keys
# while preserving order of most recent access
if len(queue) > maxqueue:
refcount.clear()
queue_appendleft(sentinel)
for key in ifilterfalse(refcount.__contains__,
iter(queue_pop, sentinel)):
queue_appendleft(key)
refcount[key] = 1
return result
def clear():
cache.clear()
queue.clear()
refcount.clear()
wrapper.hits = wrapper.misses = 0
wrapper.hits = wrapper.misses = 0
wrapper.clear = clear
return wrapper
return decorating_function
def lfu_cache(maxsize=100):
'''Least-frequenty-used cache decorator.
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
Clear the cache with f.clear().
http://en.wikipedia.org/wiki/Least_Frequently_Used
'''
def decorating_function(user_function):
cache = {} # mapping of args to results
use_count = Counter() # times each key has been accessed
kwd_mark = object() # separate positional and keyword args
@functools.wraps(user_function)
def wrapper(*args, **kwds):
key = args
if kwds:
key += (kwd_mark,) + tuple(sorted(kwds.items()))
use_count[key] += 1
# get cache entry or compute if not found
try:
result = cache[key]
wrapper.hits += 1
except KeyError:
result = user_function(*args, **kwds)
cache[key] = result
wrapper.misses += 1
# purge least frequently used cache entry
if len(cache) > maxsize:
for key, _ in nsmallest(maxsize // 10,
use_count.iteritems(),
key=itemgetter(1)):
del cache[key], use_count[key]
return result
def clear():
cache.clear()
use_count.clear()
wrapper.hits = wrapper.misses = 0
wrapper.hits = wrapper.misses = 0
wrapper.clear = clear
return wrapper
return decorating_function
if __name__ == '__main__':
@lru_cache(maxsize=20)
def f(x, y):
return 3*x+y
domain = range(5)
from random import choice
for i in range(1000):
r = f(choice(domain), choice(domain))
print(f.hits, f.misses)
@lfu_cache(maxsize=20)
def f(x, y):
return 3*x+y
domain = range(5)
from random import choice
for i in range(1000):
r = f(choice(domain), choice(domain))
print(f.hits, f.misses)
Diff to Previous Revision
--- revision 5 2010-07-30 19:44:38
+++ revision 6 2010-08-01 01:19:23
@@ -1,6 +1,8 @@
import collections
import functools
from itertools import ifilterfalse
+from heapq import nsmallest
+from operator import itemgetter
class Counter(dict):
'Mapping where default values are zero'
@@ -12,6 +14,7 @@
Arguments to the cached function must be hashable.
Cache performance statistics stored in f.hits and f.misses.
+ Clear the cache with f.clear().
http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
'''
@@ -22,6 +25,7 @@
queue = collections.deque() # order that keys have been used
refcount = Counter() # times each key is in the queue
sentinel = object() # marker for looping around the queue
+ kwd_mark = object() # separate positional and keyword args
# lookup optimizations (ugly but fast)
queue_append, queue_popleft = queue.append, queue.popleft
@@ -32,7 +36,7 @@
# cache key records both positional and keyword args
key = args
if kwds:
- key += tuple(sorted(kwds.items()))
+ key += (kwd_mark,) + tuple(sorted(kwds.items()))
# record recent use of this key
queue_append(key)
@@ -65,13 +69,70 @@
iter(queue_pop, sentinel)):
queue_appendleft(key)
refcount[key] = 1
- assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues())
+
return result
+
+ def clear():
+ cache.clear()
+ queue.clear()
+ refcount.clear()
+ wrapper.hits = wrapper.misses = 0
+
wrapper.hits = wrapper.misses = 0
+ wrapper.clear = clear
return wrapper
return decorating_function
+
+def lfu_cache(maxsize=100):
+ '''Least-frequenty-used cache decorator.
+
+ Arguments to the cached function must be hashable.
+ Cache performance statistics stored in f.hits and f.misses.
+ Clear the cache with f.clear().
+ http://en.wikipedia.org/wiki/Least_Frequently_Used
+
+ '''
+ def decorating_function(user_function):
+ cache = {} # mapping of args to results
+ use_count = Counter() # times each key has been accessed
+ kwd_mark = object() # separate positional and keyword args
+
+ @functools.wraps(user_function)
+ def wrapper(*args, **kwds):
+ key = args
+ if kwds:
+ key += (kwd_mark,) + tuple(sorted(kwds.items()))
+ use_count[key] += 1
+
+ # get cache entry or compute if not found
+ try:
+ result = cache[key]
+ wrapper.hits += 1
+ except KeyError:
+ result = user_function(*args, **kwds)
+ cache[key] = result
+ wrapper.misses += 1
+
+ # purge least frequently used cache entry
+ if len(cache) > maxsize:
+ for key, _ in nsmallest(maxsize // 10,
+ use_count.iteritems(),
+ key=itemgetter(1)):
+ del cache[key], use_count[key]
+
+ return result
+
+ def clear():
+ cache.clear()
+ use_count.clear()
+ wrapper.hits = wrapper.misses = 0
+
+ wrapper.hits = wrapper.misses = 0
+ wrapper.clear = clear
+ return wrapper
+ return decorating_function
if __name__ == '__main__':
@@ -86,3 +147,14 @@
r = f(choice(domain), choice(domain))
print(f.hits, f.misses)
+
+ @lfu_cache(maxsize=20)
+ def f(x, y):
+ return 3*x+y
+
+ domain = range(5)
+ from random import choice
+ for i in range(1000):
+ r = f(choice(domain), choice(domain))
+
+ print(f.hits, f.misses)