"""
:mod:`zsl.task.task_decorator`
------------------------------
.. moduleauthor:: Martin Babka <babka@atteq.com>,
Peter Morihladko <morihladko@atteq.com>,
Jan Janco <janco@atteq.com>,
Lubos Pis <pis@atteq.com>
"""
from datetime import timedelta
from functools import wraps
import json
import logging
from os.path import os
import traceback
from typing import Callable, List, Union
from flask import request
from zsl import Config, Injected, Zsl, inject
from zsl.application.modules.web.cors import CORSConfiguration
from zsl.constants import MimeType
from zsl.db.model import AppModelJSONEncoder
from zsl.task.job_context import JobContext, Responder, WebJobContext, add_responder
from zsl.task.task_data import TaskData
from zsl.utils.file_helper import makedirs
from zsl.utils.security_helper import verify_security_data
from zsl.utils.string_helper import join_list
[docs]
def log_output(f):
"""
Logs the output value.
"""
@wraps(f)
def wrapper_fn(*args, **kwargs):
res = f(*args, **kwargs)
logging.debug("Logging result %s.", res)
return res
return wrapper_fn
[docs]
def save_to_file(destination_filename, append=False):
"""
Save the output value to file.
"""
def decorator_fn(f):
@wraps(f)
def wrapper_fn(*args, **kwargs):
res = f(*args, **kwargs)
makedirs(os.path.dirname(destination_filename))
mode = "a" if append else "w"
with open(destination_filename, mode) as text_file:
text_file.write(res)
return res
return wrapper_fn
return decorator_fn
[docs]
def json_output(f):
"""
Format response to json and in case of web-request set response content type
to 'application/json'.
"""
@wraps(f)
def json_output_decorator(*args, **kwargs):
@inject(config=Config)
def get_config(config):
return config
config = get_config()
rv = f(*args, **kwargs)
indent = None
if config.get('DEBUG', False):
logging.getLogger(__name__).debug("Formatting JSON nicely.")
indent = 2
rv = json.dumps(rv, cls=AppModelJSONEncoder, indent=indent)
_set_json_response_content_type()
return rv
return json_output_decorator
def _set_json_response_content_type():
responder = MimeSetterWebTaskResponder(MimeType.APPLICATION_JSON.value)
add_responder(responder)
[docs]
def jsonp_wrap(callback_key='callback'):
"""
Format response to jsonp and add a callback to JSON data - a jsonp request
"""
def decorator_fn(f):
@wraps(f)
def jsonp_output_decorator(*args, **kwargs):
task_data = _get_data_from_args(args)
data = task_data.payload
if callback_key not in data:
raise KeyError(
'Missing required parameter "{0}" for task.'.format(
callback_key))
callback = data[callback_key]
jsonp = f(*args, **kwargs)
if isinstance(JobContext.get_current_context(), WebJobContext):
JobContext.get_current_context().add_responder(
MimeSetterWebTaskResponder('application/javascript'))
jsonp = "{callback}({data})".format(callback=callback, data=jsonp)
return jsonp
return jsonp_output_decorator
return decorator_fn
[docs]
def jsend_output(fail_exception_classes=None):
"""
Format task result to json output in jsend specification format. See:
https://github.com/omniti-labs. Task return value must be dict or None.
@param fail_exception_classes: exceptions which will produce 'fail' response
status.
"""
fail_exception_classes = fail_exception_classes if fail_exception_classes \
else ()
def decorator_fn(f):
@wraps(f)
@json_output
def jsend_output_decorator(*args, **kwargs):
try:
rv = f(*args, **kwargs)
except fail_exception_classes as e:
return {'status': 'fail', 'data': {'message': str(e)}}
except Exception as e:
logging.error(str(e) + "\n" + traceback.format_exc())
return {'status': 'error', 'message': 'Server error.'}
if not isinstance(rv, dict) and rv is not None:
msg = 'jsend_output decorator error: task must return dict ' \
'or None.\nTask return value: {0}.'
logging.error(msg.format(rv))
return {'status': 'error', 'message': 'Server error.'}
return {'status': 'success', 'data': rv}
return jsend_output_decorator
return decorator_fn
[docs]
def web_error_and_result(f):
"""
Same as error_and_result decorator, except:
If no exception was raised during task execution, ONLY IN CASE OF WEB
REQUEST formats task result into json dictionary {'data': task return value}
"""
@wraps(f)
def web_error_and_result_decorator(*args, **kwargs):
return error_and_result_decorator_inner_fn(f, True, *args, **kwargs)
return web_error_and_result_decorator
[docs]
def error_and_result(f):
"""
Format task result into json dictionary `{'data': task return value}` if no
exception was raised during the task execution. If there was raised an
exception during task execution, formats task result into dictionary
`{'error': exception message with traceback}`.
"""
@wraps(f)
def error_and_result_decorator(*args, **kwargs):
return error_and_result_decorator_inner_fn(f, False, *args, **kwargs)
return error_and_result_decorator
[docs]
def error_and_result_decorator_inner_fn(f, web_only, *args, **kwargs):
try:
ret_val = f(*args, **kwargs)
if web_only and not isinstance(JobContext.get_current_context(),
WebJobContext):
rv = ret_val
else:
rv = {'data': ret_val}
except: # NOQA
exc = traceback.format_exc()
logging.error(exc)
rv = {'error': "{0}".format(exc)}
return json.dumps(rv)
[docs]
def required_data(*data):
"""
Task decorator which checks if the given variables (indices) are stored
inside the task data.
"""
def decorator_fn(f):
@wraps(f)
def required_data_decorator(*args, **kwargs):
task_data = _get_data_from_args(args).payload
for i in data:
if i not in task_data:
raise KeyError(
'Missing required parameter "{0}" for task.'.format(i))
return f(*args, **kwargs)
return required_data_decorator
return decorator_fn
[docs]
def append_get_parameters(accept_only_web=True):
# type: (bool) -> Callable
"""
Task decorator which appends the GET data to the task data.
:param accept_only_web: Parameter which limits using this task only
with web requests.
"""
def wrapper(f):
@wraps(f)
def append_get_parameters_wrapper_fn(*args, **kwargs):
jc = JobContext.get_current_context()
if isinstance(jc, WebJobContext):
# Update the data with GET parameters
web_request = jc.get_web_request()
task_data = _get_data_from_args(args)
data = task_data.payload
data.update(web_request.args.to_dict(flat=True))
elif accept_only_web:
# Raise exception on non web usage if necessary
raise Exception("append_get_parameters decorator may be used "
"with GET requests only.")
return f(*args, **kwargs)
return append_get_parameters_wrapper_fn
return wrapper
[docs]
def web_task(f):
"""
Checks if the task is called through the web interface.
Task return value should be in format {'data': ...}.
"""
@wraps(f)
def web_task_decorator(*args, **kwargs):
jc = JobContext.get_current_context()
if not isinstance(jc, WebJobContext):
raise Exception(
"The WebTask is not called through the web interface.")
data = f(*args, **kwargs)
jc.add_responder(WebTaskResponder(data))
return data['data'] if 'data' in data else ""
return web_task_decorator
[docs]
def secured_task(f):
"""
Secured task decorator.
"""
@wraps(f)
def secured_task_decorator(*args, **kwargs):
task_data = _get_data_from_args(args)
assert isinstance(task_data, TaskData)
if not verify_security_data(task_data.payload['security']):
raise SecurityException(
task_data.payload['security']['hashed_token'])
task_data.transform_payload(lambda x: x.get('data', {}))
return f(*args, **kwargs)
return secured_task_decorator
[docs]
def xml_output(f):
"""
Set content-type for response to WEB-REQUEST to 'text/xml'
"""
@wraps(f)
def xml_output_inner_fn(*args, **kwargs):
ret_val = f(*args, **kwargs)
if isinstance(JobContext.get_current_context(), WebJobContext):
JobContext.get_current_context().add_responder(
MimeSetterWebTaskResponder('text/xml'))
return ret_val
return xml_output_inner_fn
[docs]
def file_upload(f):
"""
Return list of `werkzeug.datastructures.FileStorage` objects - files to be
uploaded
"""
@wraps(f)
def file_upload_decorator(*args, **kwargs):
# If the data is already transformed, we do not transform it any
# further.
task_data = _get_data_from_args(args)
if task_data is None:
logging.error("Task data is empty during FilesUploadDecorator.")
task_data.transform_payload(lambda _: request.files.getlist('file'))
return f(*args, **kwargs)
return file_upload_decorator
def _get_data_from_args(args):
task_data = None
for d in args:
if isinstance(d, TaskData):
task_data = d
return task_data
[docs]
class SecurityException(Exception):
def __init__(self, hashed_token):
Exception.__init__(self,
"Invalid hashed token '{0}'.".format(hashed_token))
self._hashed_token = hashed_token
[docs]
def get_hashed_token(self):
return self._hashed_token
[docs]
class WebTaskResponder(Responder):
def __init__(self, data):
self.data = data
[docs]
def respond(self, response):
for k in self.data:
if k == 'headers':
for header_name in self.data[k]:
response.headers[header_name] = self.data[k][header_name]
else:
setattr(response, k, self.data[k])
[docs]
class MimeSetterWebTaskResponder(Responder):
def __init__(self, mime):
self._mime = mime
[docs]
def respond(self, r):
r.content_type = self._mime
[docs]
class CrossdomainWebTaskResponder(Responder):
"""
source: https://github.com/fengsp/flask-snippets/blob/master/decorators/http_access_control.py
"""
@inject(app=Zsl, cors_config=CORSConfiguration)
def __init__(self, origin=None, methods=None, allow_headers=None,
expose_headers=None, max_age=None, app=Injected,
cors_config=Injected):
# type: (str, List[str], str, str, Union[int, timedelta], Zsl, CORSConfiguration)->None
self._app = app
methods = join_list(methods, transform=lambda x: x.upper())
self.methods = methods
if allow_headers is None:
allow_headers = cors_config.allow_headers
allow_headers = join_list(allow_headers)
self.allow_headers = allow_headers
if expose_headers is None:
expose_headers = cors_config.expose_headers
expose_headers = join_list(expose_headers)
self.expose_headers = expose_headers
if origin is None:
origin = cors_config.origin
self.origin = join_list(origin)
if max_age is None:
max_age = cors_config.max_age
if isinstance(max_age, timedelta):
max_age = max_age.total_seconds()
self.max_age = max_age
[docs]
def get_methods(self):
if self.methods is not None:
return self.methods
options_resp = self._app.make_default_options_response()
return options_resp.headers['allow']
[docs]
def respond(self, response):
headers = response.headers
headers['Access-Control-Allow-Origin'] = self.origin
headers['Access-Control-Allow-Methods'] = self.get_methods()
headers['Access-Control-Max-Age'] = str(self.max_age)
headers['Access-Control-Allow-Headers'] = self.allow_headers
headers['Access-Control-Expose-Headers'] = self.expose_headers
return response
[docs]
def crossdomain(origin=None, methods=None, allow_headers=None,
expose_headers=None, max_age=None):
def decorator(f):
@wraps(f)
def crossdomain_inner_fn(*args, **kwargs):
responder = CrossdomainWebTaskResponder(
origin, methods, allow_headers, expose_headers, max_age
)
rv = f(*args, **kwargs)
add_responder(responder)
return rv
return crossdomain_inner_fn
return decorator
[docs]
def forbid_web_access(f):
"""
Forbids running task using http request.
:param f: Callable
:return Callable
"""
@wraps(f)
def wrapper_fn(*args, **kwargs):
if isinstance(JobContext.get_current_context(), WebJobContext):
raise ForbiddenError('Access forbidden from web.')
return f(*args, **kwargs)
return wrapper_fn
[docs]
class ForbiddenError(Exception):
pass