The following recipe shows an example of the bounded buffer problem and its solution. Fortunately in Python, this is very easily solved with the Queue class from the Queue module. Even creating a buffer with a maximum size limit becomes rather easy with the automatic blocking feature (when trying to put when the Queue is full or when trying to get when the Queue is empty). Overall, this is just a simple example and approach to a classic problem.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | from os.path import basename
from Queue import Queue
from random import random
from sys import argv, exit
from threading import Thread
from time import sleep
# for creating widgets
class Widget:
pass
# for creating stacks
class Stack:
def __init__(self):
self.__stack = list()
def __len__(self):
return len(self.__stack)
def push(self, item):
self.__stack.append(item)
def pop(self):
return self.__stack.pop()
# provides an outline for the execution of the program
def main():
# check and parse the command line arguments
parse_sys_argv()
# setup the variables used by the threads
run_flag = [True]
queue = Queue(argv[1])
send = Stack()
recv = Stack()
# start the threads
producer = Thread(target=produce, args=(run_flag, queue, send))
consumer = Thread(target=consume, args=(run_flag, queue, recv, producer))
producer.start()
consumer.start()
# let the threads do their work
sleep(argv[2])
run_flag[0] = False
consumer.join()
# verify that the solution was valid
calculate_results(send, recv)
# parses and checks the command line arguments
def parse_sys_argv():
try:
# there should be two command line arguments
assert len(argv) == 3
# convert <buf_size> and check
argv[1] = abs(int(argv[1]))
assert argv[1] > 0
# convert <run_time> and check
argv[2] = abs(float(argv[2]))
assert argv[2] > 0
except:
# print out usage information
print basename(argv[0]),
print '<buf_size> <run_time>'
# exits the program
exit(1)
# called by the producer thread
def produce(run_flag, queue, send):
while run_flag[0]:
# simulate production
sleep(random())
# put widget in buffer
item = Widget()
queue.put(item)
send.push(item)
# called by the consumer thread
def consume(run_flag, queue, recv, producer):
# consume items while running
while run_flag[0]:
do_consume(queue, recv)
# empty the queue to allow maximum room
while not queue.empty():
do_consume(queue, recv)
# wait for the producer to end
producer.join()
# consume any other items that might have been produced
while not queue.empty():
do_consume(queue, recv)
# executes one consumption operation
def do_consume(queue, recv):
# get a widget from the queue
recv.push(queue.get())
# simulate consumption
sleep(random())
# verifies that send and recv were equal
def calculate_results(send, recv):
print 'Solution has',
try:
# make sure that send and recv have the same length
assert len(send) == len(recv)
# check all of the contents of send and recv
while send:
# check the identity of the items in send and recv
assert send.pop() is recv.pop()
print 'passed.'
except:
print 'failed.'
# starts the program
if __name__ == '__main__':
main()
|
This code is mainly meant to be used as an example of the bounded buffer problem and how to solve it. One of the best features of this code involves the Stack called send and the Stack called recv. They Are used by the producer and consumer threads and by the calculate_results function. The purpose of said function is to proove that the solution was indeed a success.