Source code for zsl.application.modules.celery_module

"""
:mod:`zsl.application.modules.celery_module`
--------------------------------------------
"""
import click
from injector import Module, singleton

from zsl import Injected
from zsl.application.modules.cli_module import ZslCli
from zsl.interface.celery.worker import (CeleryTaskQueueMainWorker, CeleryTaskQueueOutsideWorker,
                                         CeleryTaskQueueWorkerBase)
from zsl.interface.task_queue import TaskQueueWorker, run_worker
from zsl.utils.injection_helper import inject, simple_bind


[docs] class CeleryTaskQueueMainWorkerModule(Module): """Adds celery worker to current configuration."""
[docs] def configure(self, binder): worker = CeleryTaskQueueMainWorker() binder.bind(TaskQueueWorker, to=worker, scope=singleton) binder.bind(CeleryTaskQueueWorkerBase, to=worker, scope=singleton)
[docs] class CeleryTaskQueueOutsideWorkerModule(Module): """Adds celery outside worker to current configuration. Outside worker is meant to run with help of celery cli tools. """
[docs] def configure(self, binder): worker = CeleryTaskQueueOutsideWorker() binder.bind(TaskQueueWorker, to=worker, scope=singleton) binder.bind(CeleryTaskQueueWorkerBase, to=worker, scope=singleton)
[docs] class CeleryCli: @inject(zsl_cli=ZslCli) def __init__(self, zsl_cli: ZslCli = Injected): @zsl_cli.cli.group(help="Celery related tasks.") def celery(): pass @celery.command( help="run worker", context_settings=dict(ignore_unknown_options=True) ) @click.argument("argv", nargs=-1, type=click.UNPROCESSED) @click.pass_context def worker(_, argv): """Run Zsl celery worker. :param : arguments for celery worker """ run_worker(("worker",) + argv) self._celery = celery
[docs] def celery(self): return self._celery
[docs] class CeleryCliModule(Module):
[docs] def configure(self, binder): simple_bind(binder, CeleryCli, singleton)