This recipe provides a buffered stream that supports multiple forward-only readers. The buffer enables readers that are behind the front-runner to access values that have already been read from the stream. Values that are no longer accessible by any reader are cleared from the buffer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | import weakref
class EndOfStreamException(Exception):
pass
class InvalidReaderException(Exception):
pass
class BufferedStream:
"""A stream that supports multiple forward-only readers"""
def __init__(self, generator):
self.readers=[]
self.generator=(item for item in generator)
self.buffer=[]
self.buffer_end=0
self.buffer_start=0
self.eof = False
def read(self, reader, count):
"""Reads some values from the buffer"""
indexBeforeRead = reader.index
indexAfterRead = reader.index + count
if indexBeforeRead < self.buffer_start:
raise InvalidReaderException()
if indexAfterRead > self.buffer_end:
if self.eof:
raise EndOfStreamException()
else:
self.populateBuffer(indexAfterRead)
result = self.getBufferContents(indexBeforeRead, indexAfterRead)
reader.index = indexAfterRead
self.trimBuffer(indexBeforeRead)
return result
def populateBuffer(self, targetIndex):
"""Ensures that all required values are in the buffer"""
while self.buffer_end < targetIndex:
try:
nextValue = self.generator.next()
except StopIteration:
self.eof = True
raise EndOfStreamException()
self.buffer.append(nextValue)
self.buffer_end += 1
def trimBuffer(self, index):
"""Drops unneeded buffer contents"""
if index!=self.buffer_start:
return
newBufferStart = self.buffer_end
for readerRef in self.readers:
reader = readerRef()
if reader.index < newBufferStart:
newBufferStart=reader.index
dropSize = newBufferStart - self.buffer_start
self.buffer = self.buffer[dropSize:]
self.buffer_start = newBufferStart
def getBufferContents(self, start, end):
"""Gets some values from the buffer"""
start -= self.buffer_start
end -= self.buffer_start
return self.buffer[start:end]
def createReader(self, index=0):
reader = Reader(self, index)
self.readers.append(weakref.ref(reader, self.removeReader))
return reader
def removeReader(self, ref):
self.readers.remove(ref)
class Reader:
def __init__(self, buffer, index=0):
self.buffer = buffer
self.index = index
def read(self, count):
return self.buffer.read(self, count)
def readChar(self):
return self.read(1)[0]
def clone(self):
return self.buffer.createReader(self.index)
def __del__(self):
self.buffer.trimBuffer(self.index)
|
It's sometimes useful to be able to backtrack when performing operations (like parsing) on an incoming stream. Using this implementation, one can clone a reader and send it forward into the stream while keeping a reader at the original position in reserve:
<pre> stream = BufferedStream("Hello world") reader1 = stream.createReader() reader2 = reader1.clone()
if string.join(reader1.read(7))=="Bonjour": print "French" elif string.join(reader2.read(5))=="Hello": print "English" </pre> Various useful kinds of nondeterminism can be simulated in this manner.