"""
:mod:`zsl.task.job_context`
---------------------------
.. moduleauthor:: Martin Babka <babka@atteq.com>
"""
from __future__ import annotations
from abc import abstractmethod
import contextlib
from threading import current_thread
from typing import Any, Callable, Generator, TypedDict
from flask.wrappers import Request, Response
from zsl.task.task_data import TaskData
from zsl.utils.reflection_helper import proxy_object_to_delegate
[docs]
class JobData(TypedDict):
"""
A dictionary that represents the data associated with a job.
:ivar path: The path to the job data.
:vartype path: str
:ivar data: The data associated with the job.
:vartype data: dict[str, Any]
Example:
--------
>>> job_data = JobData(path='/path/to/job', data={'name': 'John', 'age': 30})
"""
path: str
data: dict[str, Any]
[docs]
class Job:
def __init__(self, data: JobData):
self.data = data
@property
def path(self) -> str:
"""Job's path.
:getter: Returns job's path
:type: str
"""
return self.data["path"]
@property
def payload(self) -> dict[str, Any]:
"""Data part of job.
:getter: Returns job's payload
:type: dict
"""
return self.data["data"]
@property
def is_valid(self) -> bool:
"""Validity of job's data.
:getter: Returns if job's data are valid
:type: bool
"""
return self.data and "path" in self.data and "data" in self.data
[docs]
class JobContext:
"""Job Context"""
def __init__(self, job: Job, task: object, task_callable: Callable):
"""
Constructor
"""
self._job = job
self._task = task
self._task_callable = task_callable
self._task_data = TaskData(self.job.payload)
JobContext._set_current_context(self)
@property
def job(self) -> Job:
return self._job
@property
def task(self) -> object:
return self._task
@property
def task_callable(self) -> Callable:
return self._task_callable
@property
def task_data(self) -> TaskData:
return self._task_data
[docs]
@classmethod
def get_current_context(cls) -> JobContext:
return current_thread()._current_job_context
@classmethod
def _set_current_context(cls, context: JobContext):
current_thread()._current_job_context = context
[docs]
class Responder:
[docs]
@abstractmethod
def respond(self, r: Response) -> None:
pass
[docs]
class StatusCodeResponder(Responder):
def __init__(self, status_code: int):
self._status_code = status_code
[docs]
def respond(self, r: Response) -> None:
r.status_code = self._status_code
[docs]
def add_responder(responder: Responder) -> None:
jc = JobContext.get_current_context()
if isinstance(jc, WebJobContext):
jc.add_responder(responder)
[docs]
class ResponseDict(TypedDict):
headers: HeadersDict
status_code: int
[docs]
def web_task_redirect(location) -> HeadersDict:
return {"headers": {"Location": location}, "status_code": 301}
[docs]
def create_job(path: str, data: dict[str, Any]) -> Job:
return Job({"data": data, "path": path})
[docs]
class WebJobContext(JobContext):
def __init__(
self,
path: str,
data: dict,
task: object,
task_callable: Callable,
request: Request,
):
"""Constructor"""
super().__init__(create_job(path, data), task, task_callable)
self._request = request
self._responders: list[Responder] = []
[docs]
def get_web_request(self) -> Request:
return self._request
[docs]
def add_responder(self, r: Responder):
self._responders.append(r)
[docs]
def notify_responders(self, response: Response):
try:
for r in self._responders:
r.respond(response)
finally:
self._responders = []
[docs]
class DelegatingJobContext(JobContext):
def __init__(self, job: Job, task: object, task_callable: Callable):
wrapped_job_context = JobContext.get_current_context()
super().__init__(job, task, task_callable)
self._wrapped_job_context = wrapped_job_context
proxy_object_to_delegate(self, wrapped_job_context)
[docs]
def stop_delegating(self):
JobContext._set_current_context(self._wrapped_job_context)
[docs]
@contextlib.contextmanager
def delegating_job_context(
job: Job, task: object, task_callable: Callable
) -> Generator[DelegatingJobContext, None, None]:
djc = DelegatingJobContext(job, task, task_callable)
yield djc
djc.stop_delegating()