Source code for zsl.interface.task_queue

"""
:mod:`zsl.interface.task_queue`
-------------------------------

This module contains interfaces and functions to handle task queues in ZSL. A
task queue handles asynchronous and distributed code executions.

.. moduleauthor:: Peter Morihladko
"""
import abc
import socket
import traceback
from typing import Any, TypedDict

from zsl import Config, Injected, Zsl, inject
from zsl.router.task import TaskRouter
from zsl.task.job_context import Job, JobContext


[docs] class KillWorkerException(Exception): """If any task raises this exception a standalone worker will be killed.""" pass
[docs] class JobResult(TypedDict): task_name: Any data: Any
[docs] @inject(app=Zsl, task_router=TaskRouter) def execute_job( job: Job, app: Zsl = Injected, task_router: TaskRouter = Injected ) -> JobResult: """Execute a job. :param job: job to execute :type job: Job :param app: service application instance, injected :type app: ServiceApplication :param task_router: task router instance, injected :type task_router: TaskRouter :return: task result :rtype: dict """ app.logger.info("Job fetched, preparing the task '{0}'.".format(job.path)) task, task_callable = task_router.route(job.path) jc = JobContext(job, task, task_callable) app.logger.info("Executing task.") result = jc.task_callable(jc.task_data) app.logger.info("Task {0} executed successfully.".format(job.path)) return {"task_name": job.path, "data": result}
[docs] class TaskQueueWorker(metaclass=abc.ABCMeta): """Task queue worker abstraction. A task queue worker is responsible for communicating with a task queue and executing any task given by it. It should be able to run as a stand alone application. """ @inject(app=Zsl, config=Config) def __init__(self, app: Zsl = Injected, config: Config = Injected): self._app = app self._config = config self._should_stop = False @staticmethod def _get_client_id() -> str: """Return client id. :return: client id :rtype: str """ return "zsl-client-{0}".format(socket.gethostname())
[docs] def handle_exception(self, e: Exception, task_path: str) -> JobResult: """Handle exception raised during task execution. :param e: exception :type e: Exception :param task_path: task path :type task_path: str :return: exception as task result :rtype: dict """ self._app.logger.error(str(e) + "\n" + traceback.format_exc()) return {"task_name": task_path, "data": None, "error": str(e)}
[docs] def execute_job(self, job: Job) -> JobResult: """Execute job given by the task queue. :param job: job :type job: Job :return: task result :rtype: dict """ try: return execute_job(job) except KillWorkerException: self._app.logger.info("Stopping Gearman worker on demand flag set.") self.stop_worker() except Exception as e: return self.handle_exception(e, job.path)
[docs] @abc.abstractmethod def run(self, *args, **kwargs): """Run the worker.""" pass
[docs] @abc.abstractmethod def stop_worker(self): """Stop the worker.""" pass
@inject(worker=TaskQueueWorker) def _get_worker(worker: TaskQueueWorker = Injected) -> TaskQueueWorker: return worker
[docs] def run_worker(*args: Any, **kwargs: Any) -> None: """Run the app as a task queue worker. The worker instance is given as a DI module. """ worker = _get_worker() worker.run(*args, **kwargs)