Welcome, guest | Sign In | My Account | Store | Cart

This recipe provides a buffered stream that supports multiple forward-only readers. The buffer enables readers that are behind the front-runner to access values that have already been read from the stream. Values that are no longer accessible by any reader are cleared from the buffer.

Python, 89 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import weakref

class EndOfStreamException(Exception):
  pass

class InvalidReaderException(Exception):
  pass

class BufferedStream:
  """A stream that supports multiple forward-only readers"""
  def __init__(self, generator):
      self.readers=[]
      self.generator=(item for item in generator)
      self.buffer=[]
      self.buffer_end=0
      self.buffer_start=0
      self.eof = False

  def read(self, reader, count):
      """Reads some values from the buffer"""
      indexBeforeRead = reader.index
      indexAfterRead = reader.index + count
      if indexBeforeRead < self.buffer_start:
          raise InvalidReaderException()
      if indexAfterRead > self.buffer_end:
          if self.eof:
              raise EndOfStreamException()
          else:
               self.populateBuffer(indexAfterRead)
      result = self.getBufferContents(indexBeforeRead, indexAfterRead)
      reader.index = indexAfterRead
      self.trimBuffer(indexBeforeRead)
      return result    

  def populateBuffer(self, targetIndex):
      """Ensures that all required values are in the buffer"""
      while self.buffer_end < targetIndex:
          try:
              nextValue = self.generator.next()
          except StopIteration:
              self.eof = True
              raise EndOfStreamException()
          self.buffer.append(nextValue)
          self.buffer_end += 1

  def trimBuffer(self, index):
      """Drops unneeded buffer contents"""
      if index!=self.buffer_start:
          return
      newBufferStart = self.buffer_end
      for readerRef in self.readers:
          reader = readerRef()
          if reader.index < newBufferStart:
              newBufferStart=reader.index
      dropSize = newBufferStart - self.buffer_start
      self.buffer = self.buffer[dropSize:]
      self.buffer_start = newBufferStart

  def getBufferContents(self, start, end):
      """Gets some values from the buffer"""
      start -= self.buffer_start
      end -= self.buffer_start
      return self.buffer[start:end]

  def createReader(self, index=0):
      reader = Reader(self, index)
      self.readers.append(weakref.ref(reader, self.removeReader))
      return reader

  def removeReader(self, ref):
      self.readers.remove(ref)


class Reader:
  def __init__(self, buffer, index=0):
      self.buffer = buffer
      self.index = index

  def read(self, count):
      return self.buffer.read(self, count)

  def readChar(self):
      return self.read(1)[0]

  def clone(self):
      return self.buffer.createReader(self.index)

  def __del__(self):
      self.buffer.trimBuffer(self.index)

It's sometimes useful to be able to backtrack when performing operations (like parsing) on an incoming stream. Using this implementation, one can clone a reader and send it forward into the stream while keeping a reader at the original position in reserve:

<pre> stream = BufferedStream("Hello world") reader1 = stream.createReader() reader2 = reader1.clone()

if string.join(reader1.read(7))=="Bonjour": print "French" elif string.join(reader2.read(5))=="Hello": print "English" </pre> Various useful kinds of nondeterminism can be simulated in this manner.