Welcome, guest | Sign In | My Account | Store | Cart

Consumer Application Skeleton

This is very basic skeleton for data processing application implementing consumer pattern:

while is_running():
    task = get_next_task_from_queue()
    if task:
        submit_task_for_processing(task)
    else:
        sleep_for_a_moment()

Here's an example:

class ExampleApp(ConsumerAppBase):

    def _get_next_task(self):

        # Get next task from the queue.
        return self._queue.next()

    def _run_task(self, task):

        # This code's being executed in separate worker thread of
        # ThreadPoolExecutor
        return task / 2

    def _on_task_done(self, task, future):

        # Once worker thread finished - task results are available
        # in _on_task_done() callback as a concurrent.futures.Future object.
        self._log.info('Task done. Result: %s', future.result())
Python, 272 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""This is very basic skeleton for data processing application implementing
consumer pattern.
"""

__author__ = 'vovanec@gmail.com'

from concurrent.futures import ThreadPoolExecutor

import functools
import logging
import signal
import threading


DEFAULT_NUM_WORKERS = 16


class ConsumerAppBase(object):
    """Base class for task consumer application.
    """

    sleep_timeout = 3

    def __init__(self, app_name, num_workers=DEFAULT_NUM_WORKERS):
        """Constructor.

        :param str app_name: application name.
        :param int num_workers: number of worker threads.
        """

        self._log = logging.getLogger(app_name)
        self._app_name = app_name
        self._stop_event = threading.Event()
        self._task_executor = ThreadPoolExecutor(max_workers=num_workers)
        self._task_limiter = threading.Semaphore(value=num_workers)

    def run(self):
        """Run application.
        """

        exit_status = 0
        self._install_signal_handlers()

        try:
            self._on_start()
            self._main_loop()
        except BaseException as exc:
            self._log.exception('Unrecoverable exception in %s main loop. '
                                'Exiting: %s', self._app_name, exc)
            exit_status = 1
        finally:
            self._stop_event.set()
            self._stop_task_executor()
            self._on_stop()

            self._log.info('Done.')

        return exit_status

    def stop(self):
        """Tell the main loop to stop and shutdown.
        """

        self._stop_event.set()

    def _get_next_task(self):
        """Get next task for processing. Subclasses MUST implement this method.

        :return: next task object. If None returned - framework assumes the
                 input queue is empty and waits for TaskProcessor.sleep_timeout
                 time before calling to _get_next_task() again.

        :rtype: object|None
        """

        raise NotImplementedError

    def _run_task(self, task):
        """Run task in separate worker thread of ThreadPoolExecutor.
        Subclasses MUST implement this method.

        :param object task: task item.

        :rtype: object|None
        :return: this method should return task execution result, that will be
                 available in _on_task_done() callback wrapped in
                 concurrent.futures.Future object.
        """

        raise NotImplementedError

    def _on_task_done(self, task, future):
        """This callback is being called after task finished.

        Subclasses may implement this method to handle tasks results,
        perform some cleanup etc.

        :param object task: task item.
        :param concurrent.futures.Future future: future that wraps
               task execution result.

        :rtype: None
        """

        pass

    def _on_start(self):
        """Subclasses may re-implement this method to add custom logic on
        application start, right before entering the main loop.
        """

        pass

    def _on_stop(self):
        """Subclasses may re-implement this method to add custom logic on
        application stop, right after exiting the main loop.
        """

        pass

    # Private methods

    def _install_signal_handlers(self):
        """Install signal handlers for the process.
        """

        self._log.info('Installing signal handlers')

        def handler(signum, _):
            """Signal handler.
            """

            self._log.info('Got signal %s', signum)
            self._stop_event.set()

        for sig in (signal.SIGHUP, signal.SIGINT, signal.SIGTERM,
                    signal.SIGQUIT, signal.SIGABRT):

            signal.signal(sig, handler)

    def _main_loop(self):
        """Main loop.
        """

        self._log.info('Entering the main loop')

        while not self._stop_event.is_set():

            # Try to get the next task. If exception occurred - wait and retry.
            try:
                task = self._get_next_task()
            except Exception as exc:
                self._log.exception(
                    'Failed to get next task for processing: %s. Sleeping '
                    'for %s seconds before retry.', exc, self.sleep_timeout)

                self._stop_event.wait(self.sleep_timeout)

                continue

            # If task is None - wait and retry to get the next task.
            if task is None:
                self._log.info(
                    'Task queue is empty. Sleeping for %s seconds before '
                    'retry.', self.sleep_timeout)

                self._stop_event.wait(self.sleep_timeout)

                continue

            self._log.debug('Got next task for processing: %s', task)
            if self._submit_task(task):
                self._log.debug('Successfully submitted task %s for processing',
                                task)
            else:
                # Submission was interrupted because application
                # has been told to stop. Exit the main loop.
                break

        self._log.info('%s has been told to stop. Exiting.', self._app_name)

    def _submit_task(self, task):
        """Submit task to the pool executor for processing.

        :param object task: task item.

        :return: True if submission was successful, False if submission was
                 interrupted because application is about to exit.

        :rtype: bool
        """

        while not self._stop_event.is_set():

            if self._task_limiter.acquire(blocking=False):
                try:
                    task_done_cb = functools.partial(self._task_done, task)
                    self._task_executor.submit(
                        self._run_task, task).add_done_callback(task_done_cb)

                    return True
                except Exception as exc:
                    self._task_limiter.release()
                    self._log.exception(
                        'Could not submit task for processing: %s. '
                        'Sleeping for %s seconds before next try.',
                        exc, self.sleep_timeout)
            else:
                self._log.info(
                    'No free workers. Sleeping for %s seconds before next try.',
                    self.sleep_timeout)

            self._stop_event.wait(self.sleep_timeout)

        return False

    def _task_done(self, task, future):
        """Called when task is done.

        :param object task: task item.
        :param concurrent.futures.Future future: future object.
        """

        self._task_limiter.release()
        self._on_task_done(task, future)

    def _stop_task_executor(self):
        """Stop task executor instance.
        """

        if self._task_executor:
            self._log.info('Stopping task executor')
            try:
                self._task_executor.shutdown(wait=True)
            except Exception as exc:
                self._log.exception(
                    'Exception while trying to stop task executor: %s', exc)



def main():
    """Example application.
    """

    class ExampleApp(ConsumerAppBase):
        """Example application.
        """

        def _get_next_task(self):

            import random
            import time

            time.sleep(.01)
            return random.randint(0, 1000)

        def _run_task(self, task):

            return task / 2

        def _on_task_done(self, task, future):

            self._log.info('Task done. Result: %s', future.result())

    logging.basicConfig(level=logging.DEBUG)

    ExampleApp('example').run()


if __name__ == '__main__':

    main()

Source code may be also cloned from https://github.com/vovanec/consumer_app_skeleton

Created by Vovan on Tue, 30 Jun 2015 (MIT)
Python recipes (4591)
Vovan's recipes (3)

Required Modules

  • (none specified)

Other Information and Tasks