Latest recipes tagged "consumer"http://code.activestate.com/recipes/tags/consumer/new/2015-06-30T03:24:07-07:00ActiveState Code RecipesConsumer Application Skeleton (Python)
2015-06-30T03:24:07-07:00Vovanhttp://code.activestate.com/recipes/users/4192447/http://code.activestate.com/recipes/579074-consumer-application-skeleton/
<p style="color: grey">
Python
recipe 579074
by <a href="/recipes/users/4192447/">Vovan</a>
(<a href="/recipes/tags/application/">application</a>, <a href="/recipes/tags/consumer/">consumer</a>, <a href="/recipes/tags/daemon/">daemon</a>, <a href="/recipes/tags/framework/">framework</a>).
Revision 2.
</p>
<h4 id="consumer-application-skeleton">Consumer Application Skeleton</h4>
<p>This is very basic skeleton for data processing application implementing
consumer pattern:</p>
<pre class="prettyprint"><code>while is_running():
task = get_next_task_from_queue()
if task:
submit_task_for_processing(task)
else:
sleep_for_a_moment()
</code></pre>
<p>Here's an example:</p>
<pre class="prettyprint"><code>class ExampleApp(ConsumerAppBase):
def _get_next_task(self):
# Get next task from the queue.
return self._queue.next()
def _run_task(self, task):
# This code's being executed in separate worker thread of
# ThreadPoolExecutor
return task / 2
def _on_task_done(self, task, future):
# Once worker thread finished - task results are available
# in _on_task_done() callback as a concurrent.futures.Future object.
self._log.info('Task done. Result: %s', future.result())
</code></pre>
Queue with tagged items (Python)
2009-01-28T07:23:40-08:00Jirka Vejrazkahttp://code.activestate.com/recipes/users/4168986/http://code.activestate.com/recipes/576632-queue-with-tagged-items/
<p style="color: grey">
Python
recipe 576632
by <a href="/recipes/users/4168986/">Jirka Vejrazka</a>
(<a href="/recipes/tags/consumer/">consumer</a>, <a href="/recipes/tags/producer/">producer</a>, <a href="/recipes/tags/queue/">queue</a>, <a href="/recipes/tags/tag/">tag</a>).
Revision 2.
</p>
<p>I needed multiple consumers to retrieve data from a queue fed by one producer. Could not find a good working code for that, so I implemented my own queue. Docstring should describe how this works.</p>
<p>Two notes:
1) my code uses multiprocessing code, but in this module, the Lock and Condition could be easily replaced with the same objects from the threading module
2) the attached test uses syntax for "nose" testing package, I did not convert it to doctest or UnitTest.</p>