"""
:mod:`zsl.interface.celery.worker`
----------------------------------
Implementation of celery workers.
.. moduleauthor:: Peter Morihladko
"""
from celery import Celery, shared_task
from zsl import Config, Injected, inject
from zsl.interface.task_queue import JobResult, TaskQueueWorker
from zsl.task.job_context import Job, JobData
[docs]
class CeleryTaskQueueWorkerBase(TaskQueueWorker):
"""Base class for celery task queue worker.
It contains only the task execution part of worker.
"""
[docs]
def execute_celery_task(self, job_data: dict) -> JobResult:
"""Creates job from task and executes the job.
:param job_data: job's data
:return: job results
:rtype: dict
"""
job = Job(job_data)
return self.execute_job(job)
[docs]
class CeleryTaskQueueOutsideWorker(CeleryTaskQueueWorkerBase):
"""Celery worker used only for task execution.
This can be used when the worker is managed with some other application,
like `celery worker` or `celery multi`.
"""
[docs]
def stop_worker(self):
self._app.logger.error("Running from celery worker, kill from shell!")
[docs]
def run(self):
self._app.logger.error("Running from celery worker, start from shell!")
[docs]
@inject(config=Config)
def create_celery_app(config: Config = Injected) -> Celery:
celery_app = Celery()
celery_app.config_from_object(config["CELERY"])
return celery_app
[docs]
class CeleryTaskQueueMainWorker(CeleryTaskQueueWorkerBase):
"""Worker implementation for Celery task queue."""
def __init__(
self,
):
super().__init__()
self.celery_app = create_celery_app()
[docs]
def stop_worker(self):
self._app.logger.error(
"This is a celery app worker, kill the instance to stop it."
)
[docs]
def run(self, argv: list[str]):
"""
Run the celery worker cmd with given arguments from the list.
Note: the first argument should be "worker".
"""
self._app.logger.info("Running the worker.")
self.celery_app.worker_main(argv)
[docs]
@shared_task
@inject(worker=CeleryTaskQueueWorkerBase)
def zsl_task(job_data: JobData, worker: CeleryTaskQueueWorkerBase = Injected) -> JobResult:
"""
Executes a task registered with Celery using the provided job data.
`job_data` is a dictionary that describes the path to the desired task along with the payload. Specifically,
it should contain a 'path' key pointing to the task and a 'data' key with the payload as a dictionary.
:param job_data: A dictionary containing the path to the task and its payload.
:type job_data: dict
:param worker: The Celery worker responsible for executing the task. *Injected.*
:type worker: CeleryTaskQueueWorkerBase
:return: The result of the executed task.
:rtype: JobResult
:Example:
>>> job_data = {
"path": "some_module/some_task_function",
"data": {
"param1": "value1",
"param2": "value2"
}
}
"""
return worker.execute_celery_task(job_data)