Consumer Application Skeleton
This is very basic skeleton for data processing application implementing consumer pattern:
while is_running():
task = get_next_task_from_queue()
if task:
submit_task_for_processing(task)
else:
sleep_for_a_moment()
Here's an example:
class ExampleApp(ConsumerAppBase):
def _get_next_task(self):
# Get next task from the queue.
return self._queue.next()
def _run_task(self, task):
# This code's being executed in separate worker thread of
# ThreadPoolExecutor
return task / 2
def _on_task_done(self, task, future):
# Once worker thread finished - task results are available
# in _on_task_done() callback as a concurrent.futures.Future object.
self._log.info('Task done. Result: %s', future.result())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 | """This is very basic skeleton for data processing application implementing
consumer pattern.
"""
__author__ = 'vovanec@gmail.com'
from concurrent.futures import ThreadPoolExecutor
import functools
import logging
import signal
import threading
DEFAULT_NUM_WORKERS = 16
class ConsumerAppBase(object):
"""Base class for task consumer application.
"""
sleep_timeout = 3
def __init__(self, app_name, num_workers=DEFAULT_NUM_WORKERS):
"""Constructor.
:param str app_name: application name.
:param int num_workers: number of worker threads.
"""
self._log = logging.getLogger(app_name)
self._app_name = app_name
self._stop_event = threading.Event()
self._task_executor = ThreadPoolExecutor(max_workers=num_workers)
self._task_limiter = threading.Semaphore(value=num_workers)
def run(self):
"""Run application.
"""
exit_status = 0
self._install_signal_handlers()
try:
self._on_start()
self._main_loop()
except BaseException as exc:
self._log.exception('Unrecoverable exception in %s main loop. '
'Exiting: %s', self._app_name, exc)
exit_status = 1
finally:
self._stop_event.set()
self._stop_task_executor()
self._on_stop()
self._log.info('Done.')
return exit_status
def stop(self):
"""Tell the main loop to stop and shutdown.
"""
self._stop_event.set()
def _get_next_task(self):
"""Get next task for processing. Subclasses MUST implement this method.
:return: next task object. If None returned - framework assumes the
input queue is empty and waits for TaskProcessor.sleep_timeout
time before calling to _get_next_task() again.
:rtype: object|None
"""
raise NotImplementedError
def _run_task(self, task):
"""Run task in separate worker thread of ThreadPoolExecutor.
Subclasses MUST implement this method.
:param object task: task item.
:rtype: object|None
:return: this method should return task execution result, that will be
available in _on_task_done() callback wrapped in
concurrent.futures.Future object.
"""
raise NotImplementedError
def _on_task_done(self, task, future):
"""This callback is being called after task finished.
Subclasses may implement this method to handle tasks results,
perform some cleanup etc.
:param object task: task item.
:param concurrent.futures.Future future: future that wraps
task execution result.
:rtype: None
"""
pass
def _on_start(self):
"""Subclasses may re-implement this method to add custom logic on
application start, right before entering the main loop.
"""
pass
def _on_stop(self):
"""Subclasses may re-implement this method to add custom logic on
application stop, right after exiting the main loop.
"""
pass
# Private methods
def _install_signal_handlers(self):
"""Install signal handlers for the process.
"""
self._log.info('Installing signal handlers')
def handler(signum, _):
"""Signal handler.
"""
self._log.info('Got signal %s', signum)
self._stop_event.set()
for sig in (signal.SIGHUP, signal.SIGINT, signal.SIGTERM,
signal.SIGQUIT, signal.SIGABRT):
signal.signal(sig, handler)
def _main_loop(self):
"""Main loop.
"""
self._log.info('Entering the main loop')
while not self._stop_event.is_set():
# Try to get the next task. If exception occurred - wait and retry.
try:
task = self._get_next_task()
except Exception as exc:
self._log.exception(
'Failed to get next task for processing: %s. Sleeping '
'for %s seconds before retry.', exc, self.sleep_timeout)
self._stop_event.wait(self.sleep_timeout)
continue
# If task is None - wait and retry to get the next task.
if task is None:
self._log.info(
'Task queue is empty. Sleeping for %s seconds before '
'retry.', self.sleep_timeout)
self._stop_event.wait(self.sleep_timeout)
continue
self._log.debug('Got next task for processing: %s', task)
if self._submit_task(task):
self._log.debug('Successfully submitted task %s for processing',
task)
else:
# Submission was interrupted because application
# has been told to stop. Exit the main loop.
break
self._log.info('%s has been told to stop. Exiting.', self._app_name)
def _submit_task(self, task):
"""Submit task to the pool executor for processing.
:param object task: task item.
:return: True if submission was successful, False if submission was
interrupted because application is about to exit.
:rtype: bool
"""
while not self._stop_event.is_set():
if self._task_limiter.acquire(blocking=False):
try:
task_done_cb = functools.partial(self._task_done, task)
self._task_executor.submit(
self._run_task, task).add_done_callback(task_done_cb)
return True
except Exception as exc:
self._task_limiter.release()
self._log.exception(
'Could not submit task for processing: %s. '
'Sleeping for %s seconds before next try.',
exc, self.sleep_timeout)
else:
self._log.info(
'No free workers. Sleeping for %s seconds before next try.',
self.sleep_timeout)
self._stop_event.wait(self.sleep_timeout)
return False
def _task_done(self, task, future):
"""Called when task is done.
:param object task: task item.
:param concurrent.futures.Future future: future object.
"""
self._task_limiter.release()
self._on_task_done(task, future)
def _stop_task_executor(self):
"""Stop task executor instance.
"""
if self._task_executor:
self._log.info('Stopping task executor')
try:
self._task_executor.shutdown(wait=True)
except Exception as exc:
self._log.exception(
'Exception while trying to stop task executor: %s', exc)
def main():
"""Example application.
"""
class ExampleApp(ConsumerAppBase):
"""Example application.
"""
def _get_next_task(self):
import random
import time
time.sleep(.01)
return random.randint(0, 1000)
def _run_task(self, task):
return task / 2
def _on_task_done(self, task, future):
self._log.info('Task done. Result: %s', future.result())
logging.basicConfig(level=logging.DEBUG)
ExampleApp('example').run()
if __name__ == '__main__':
main()
|
Source code may be also cloned from https://github.com/vovanec/consumer_app_skeleton