Welcome, guest | Sign In | My Account | Store | Cart
"""This is very basic skeleton for data processing application implementing
consumer pattern.
"""

__author__ = 'vovanec@gmail.com'

from concurrent.futures import ThreadPoolExecutor

import functools
import logging
import signal
import threading


DEFAULT_NUM_WORKERS = 16


class ConsumerAppBase(object):
    """Base class for task consumer application.
    """

    sleep_timeout = 3

    def __init__(self, app_name, num_workers=DEFAULT_NUM_WORKERS):
        """Constructor.

        :param str app_name: application name.
        :param int num_workers: number of worker threads.
        """

        self._log = logging.getLogger(app_name)
        self._app_name = app_name
        self._stop_event = threading.Event()
        self._task_executor = ThreadPoolExecutor(max_workers=num_workers)
        self._task_limiter = threading.Semaphore(value=num_workers)

    def run(self):
        """Run application.
        """

        exit_status = 0
        self._install_signal_handlers()

        try:
            self._on_start()
            self._main_loop()
        except BaseException as exc:
            self._log.exception('Unrecoverable exception in %s main loop. '
                                'Exiting: %s', self._app_name, exc)
            exit_status = 1
        finally:
            self._stop_event.set()
            self._stop_task_executor()
            self._on_stop()

            self._log.info('Done.')

        return exit_status

    def stop(self):
        """Tell the main loop to stop and shutdown.
        """

        self._stop_event.set()

    def _get_next_task(self):
        """Get next task for processing. Subclasses MUST implement this method.

        :return: next task object. If None returned - framework assumes the
                 input queue is empty and waits for TaskProcessor.sleep_timeout
                 time before calling to _get_next_task() again.

        :rtype: object|None
        """

        raise NotImplementedError

    def _run_task(self, task):
        """Run task in separate worker thread of ThreadPoolExecutor.
        Subclasses MUST implement this method.

        :param object task: task item.

        :rtype: object|None
        :return: this method should return task execution result, that will be
                 available in _on_task_done() callback wrapped in
                 concurrent.futures.Future object.
        """

        raise NotImplementedError

    def _on_task_done(self, task, future):
        """This callback is being called after task finished.

        Subclasses may implement this method to handle tasks results,
        perform some cleanup etc.

        :param object task: task item.
        :param concurrent.futures.Future future: future that wraps
               task execution result.

        :rtype: None
        """

        pass

    def _on_start(self):
        """Subclasses may re-implement this method to add custom logic on
        application start, right before entering the main loop.
        """

        pass

    def _on_stop(self):
        """Subclasses may re-implement this method to add custom logic on
        application stop, right after exiting the main loop.
        """

        pass

    # Private methods

    def _install_signal_handlers(self):
        """Install signal handlers for the process.
        """

        self._log.info('Installing signal handlers')

        def handler(signum, _):
            """Signal handler.
            """

            self._log.info('Got signal %s', signum)
            self._stop_event.set()

        for sig in (signal.SIGHUP, signal.SIGINT, signal.SIGTERM,
                    signal.SIGQUIT, signal.SIGABRT):

            signal.signal(sig, handler)

    def _main_loop(self):
        """Main loop.
        """

        self._log.info('Entering the main loop')

        while not self._stop_event.is_set():

            # Try to get the next task. If exception occurred - wait and retry.
            try:
                task = self._get_next_task()
            except Exception as exc:
                self._log.exception(
                    'Failed to get next task for processing: %s. Sleeping '
                    'for %s seconds before retry.', exc, self.sleep_timeout)

                self._stop_event.wait(self.sleep_timeout)

                continue

            # If task is None - wait and retry to get the next task.
            if task is None:
                self._log.info(
                    'Task queue is empty. Sleeping for %s seconds before '
                    'retry.', self.sleep_timeout)

                self._stop_event.wait(self.sleep_timeout)

                continue

            self._log.debug('Got next task for processing: %s', task)
            if self._submit_task(task):
                self._log.debug('Successfully submitted task %s for processing',
                                task)
            else:
                # Submission was interrupted because application
                # has been told to stop. Exit the main loop.
                break

        self._log.info('%s has been told to stop. Exiting.', self._app_name)

    def _submit_task(self, task):
        """Submit task to the pool executor for processing.

        :param object task: task item.

        :return: True if submission was successful, False if submission was
                 interrupted because application is about to exit.

        :rtype: bool
        """

        while not self._stop_event.is_set():

            if self._task_limiter.acquire(blocking=False):
                try:
                    task_done_cb = functools.partial(self._task_done, task)
                    self._task_executor.submit(
                        self._run_task, task).add_done_callback(task_done_cb)

                    return True
                except Exception as exc:
                    self._task_limiter.release()
                    self._log.exception(
                        'Could not submit task for processing: %s. '
                        'Sleeping for %s seconds before next try.',
                        exc, self.sleep_timeout)
            else:
                self._log.info(
                    'No free workers. Sleeping for %s seconds before next try.',
                    self.sleep_timeout)

            self._stop_event.wait(self.sleep_timeout)

        return False

    def _task_done(self, task, future):
        """Called when task is done.

        :param object task: task item.
        :param concurrent.futures.Future future: future object.
        """

        self._task_limiter.release()
        self._on_task_done(task, future)

    def _stop_task_executor(self):
        """Stop task executor instance.
        """

        if self._task_executor:
            self._log.info('Stopping task executor')
            try:
                self._task_executor.shutdown(wait=True)
            except Exception as exc:
                self._log.exception(
                    'Exception while trying to stop task executor: %s', exc)



def main():
    """Example application.
    """

    class ExampleApp(ConsumerAppBase):
        """Example application.
        """

        def _get_next_task(self):

            import random
            import time

            time.sleep(.01)
            return random.randint(0, 1000)

        def _run_task(self, task):

            return task / 2

        def _on_task_done(self, task, future):

            self._log.info('Task done. Result: %s', future.result())

    logging.basicConfig(level=logging.DEBUG)

    ExampleApp('example').run()


if __name__ == '__main__':

    main()

Diff to Previous Revision

--- revision 1 2015-06-30 03:18:13
+++ revision 2 2015-06-30 03:24:07
@@ -1,3 +1,9 @@
+"""This is very basic skeleton for data processing application implementing
+consumer pattern.
+"""
+
+__author__ = 'vovanec@gmail.com'
+
 from concurrent.futures import ThreadPoolExecutor
 
 import functools

History