Source code for zsl.resource.model_resource

"""
:mod:`zsl.resource.model_resource` -- REST for a DB model.
----------------------------------------------------------

Resources provide a way how to directly access DB tables (raw models) and
perform CRUD operations upon them. The default classes of the models should be
overridden to provide more logic or restrictions if wanted.

The basic way to use them is as follows:
 - The models can be defined in the __exposer__.py which is set up at settings.
 - To override just override the basic methods - `create`, `read`, `update`,
   `delete`.

.. moduleauthor:: Peter Morihladko <peter@atteq.com>, Martin Babka <babka@atteq.com>
"""
from hashlib import sha256
import json
import logging
from typing import Any, List, Union

from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import InstrumentedAttribute

from zsl import inject
from zsl.cache.cache_module import CacheModule
from zsl.cache.id_helper import IdHelper, create_key_class_prefix
from zsl.db.helpers import app_models
from zsl.db.helpers.nested import nested_model, nested_models
from zsl.db.model.app_model import AppModel
from zsl.resource.resource_helper import (apply_related, create_related_tree, filter_from_url_arg, order_from_url_arg,
                                          related_from_fields)
from zsl.service.service import TransactionalSupportMixin, transactional
from zsl.utils.cache_helper import app_model_decoder_fn, app_model_encoder_fn


[docs] def dict_pick(dictionary, allowed_keys): """ Return a dictionary only with keys found in `allowed_keys` """ return {key: value for key, value in dictionary.items() if key in allowed_keys}
[docs] def page_to_offset(params): """ Transforms `page`/`per_page` from `params` to `limit`/`offset` suitable for SQL. :param dict params: The dictionary containing `page` and `per_page` values will be added the values `limit` and `offset`. """ if 'page' not in params: return page = params['page'] del params['page'] # 'per_page' je len alias za 'limit' if 'per_page' in params: per_page = params.get('per_page') del params['per_page'] else: per_page = params.get('limit', 10) params['offset'] = int(per_page) * (int(page) - 1) params['limit'] = per_page
def _is_list(list_): # type: (Any) -> bool """Test if variable is a list. Maybe this is not the best way to test for a list, but it is sufficient for current use case. """ try: len(list_) return True except TypeError: return False
[docs] class ResourceQueryContext: """ The context of the resource query. - holds the parameters and arguments of the query, - holds the related models which should be fetched (parsed from the arguments), - holds the given filter and splits it to the given field array (parsed from the arguments) .. automethod:: __init__ """
[docs] def __init__(self, params, args, data): # type: (dict, list, dict) -> () self._args = args.copy() self._args_original = args self._data = data self._params = params if _is_list(params) else [params] # Prepare fields and related. if 'related' in self._args: self._args['related'] = self._args['related'].split(',') if 'fields' in self._args: self._args['fields'] = self._args['fields'].split(',') # we can pass related fields with this, ensure its in 'related' union of two lists self._args['related'] = list( set(self._args.get('related', [])) | set(related_from_fields(self._args['fields'])))
@property def params(self): """Params are given as the part of the path in URL. For example GET /entities/1 will have. 1 in the params. """ return self._params @property def args(self): """Args are in the query part of the url ?related=&filter_by etc.""" return self._args @property def data(self): """Body of the request.""" return self._data
[docs] def get_row_id(self): """First parameter, if given, else None. Handy for GET requests.""" return None if len(self.params) == 0 else self.params[0]
[docs] def get_filter_by(self): """Filter argument - list of filters.""" return self._args.get('filter_by', None)
[docs] class ModelResourceBase(TransactionalSupportMixin): """ModelResource works only for tables with a single-column identifier (key). .. automethod:: __init__ """
[docs] def __init__(self, model_cls=None): """ Create Model CRUD resource for ``model_cls`` """ super().__init__() if not model_cls: self.model_cls = self.__model__ else: self.model_cls = model_cls mapper = class_mapper(self.model_cls) self._model_pk = mapper.primary_key[0] self._model_columns = [column.key for column in mapper.column_attrs] self._related_columns = [column.key for column in mapper.relationships]
[docs] def to_filter(self, query, arg): return filter_from_url_arg(self.model_cls, query, arg)
[docs] def set_ordering(self, query, arg): return order_from_url_arg(self.model_cls, query, arg)
[docs] class ModelResource(ModelResourceBase): """ .. automethod:: _create_context .. automethod:: _create_one .. automethod:: _save_one .. automethod:: _delete_one .. automethod:: _create_delete_one_query """
[docs] @staticmethod def _create_context(params, args, data): # type: (dict, list, dict) -> ResourceQueryContext """ Creates the resource query context - this an object holding the data alongside the querying of the resource. This object is always present as a parameter for each method during the query and users.py are free to create own properties so that they can optimize and perform the query (so the subsequent methods have an access to the already precomputed data). """ return ResourceQueryContext(params, args, data)
[docs] @transactional def create(self, params, args, data): # type: (str, dict, dict) -> AppModel """ POST /resource/model_cls/ data Create new resource """ ctx = self._create_context(params, args, data) model = self._create_one(ctx) self._save_one(model, ctx) return self._return_saved_one(model, ctx)
[docs] def read(self, params=None, args=None, data=None): # type: (str, dict, dict) -> Union[List[AppModel], AppModel] """ GET /resource/model_cls/[params:id]?[args:{limit,offset,page,per_page,filter_by,order_by,related,fields}] Get resource/s :param params :type params list :param args :type args dict :param data :type data: dict """ if params is None: params = [] if args is None: args = {} ctx = self._create_context(params, args, data) row_id = ctx.get_row_id() if row_id: return self._get_one(row_id, ctx) elif 'count' in args: return self._get_collection_count(ctx) elif 'desc' in args: return self._get_collection_desc(ctx) else: if 'page' in ctx.args: page_to_offset(ctx.args) return self._get_collection(ctx)
[docs] @transactional def update(self, params, args, data): # type: (str, dict, dict) -> Union[List[AppModel], AppModel] """ PUT /resource/model_cls/[params:id] data Update resource/s """ ctx = self._create_context(params, args, data) row_id = ctx.get_row_id() if row_id is not None: model = self._update_one(ctx) return None if model is None else model.get_app_model() else: return app_models(self._update_collection(ctx))
[docs] @transactional def delete(self, params, args, data): # type: (str, dict, dict) -> None """ DELETE /resource/model_cls/[params]?[args] delete resource/s """ ctx = self._create_context(params, args, data) row_id = ctx.get_row_id() if row_id is not None: return self._delete_one(row_id, ctx) else: return self._delete_collection(ctx)
# Create implementation
[docs] def _create_one(self, ctx): """ Creates an instance to be saved when a model is created. """ assert isinstance(ctx, ResourceQueryContext) fields = dict_pick(ctx.data, self._model_columns) model = self.model_cls(**fields) return model
[docs] def _save_one(self, model, ctx): """ Saves the created instance. """ assert isinstance(ctx, ResourceQueryContext) self._orm.add(model) self._orm.flush()
@staticmethod def _return_saved_one(model, ctx): """ Returns the result of the create operation. """ return model.get_app_model() # Read one implementation. @transactional def _get_one(self, row_id, ctx): assert isinstance(ctx, ResourceQueryContext) q = self._orm.query(self.model_cls).filter(self._model_pk == row_id) related = ctx.get_related() if related is not None: q = self.add_related(q, related) return nested_model(self._read_one(q, ctx), create_related_tree(related)) else: return self._read_one(q, ctx).get_app_model() @staticmethod def _read_one(q, ctx): return q.one() # Read collection implementation. def _create_collection_query(self, ctx): return self._orm.query(self.model_cls) @transactional def _get_collection_count(self, ctx): assert isinstance(ctx, ResourceQueryContext) filter_by = ctx.get_filter_by() q = self._create_collection_query(ctx) if filter_by is not None: q = self.to_filter(q, filter_by) return self._read_collection_count(q, ctx) def _read_collection_count(self, q, ctx): return q.with_entities(self._model_pk).count() @transactional def _get_collection(self, ctx): assert isinstance(ctx, ResourceQueryContext) order_by = ctx.args.get('order_by', None) filter_by = ctx.get_filter_by() related = ctx.get_related() q = self._create_collection_query(ctx) if filter_by is not None: q = self.to_filter(q, filter_by) if order_by is not None: q = self.set_ordering(q, order_by) if related is not None: q = self.add_related(q, related) return nested_models(self._read_collection(q, ctx), create_related_tree(related)) else: return app_models(self._read_collection(q, ctx)) @staticmethod def _read_collection(q, ctx): offset = int(ctx.args.get('offset', 0)) limit = ctx.args.get('limit', 10) if limit is not None and limit != 'unlimited': q = q.limit(int(limit)) if offset > 0: q = q.offset(offset) return q.all() def _get_collection_desc(self): return [column.name for column in class_mapper(self.model_cls).columns] # Update def _update_one_simple(self, row_id, fields, ctx): fields = dict_pick(fields, self._model_columns) model = self._orm.query(self.model_cls).get(row_id) if model is None: return None for name, value in fields.items(): setattr(model, name, value) return model def _update_one(self, ctx): """ Update row """ assert isinstance(ctx, ResourceQueryContext) fields = ctx.data row_id = ctx.get_row_id() return self._update_one_simple(row_id, fields, ctx) def _update_collection(self, ctx): """ Bulk update """ assert isinstance(ctx, ResourceQueryContext) models = [] for row in ctx.data: models.append(self._update_one_simple(row.pop('id'), row, ctx)) return models # Delete methods
[docs] def _delete_one(self, row_id, ctx): """ Deletes row by the given id -- `row_id`. The method first created the query using the method :meth:`_create_delete_one_query` and then executes it. :param int row_id: Identifier of the deleted row. :param ResourceQueryContext ctx: The context of this delete query. """ return self._create_delete_one_query(row_id, ctx).delete()
[docs] def _create_delete_one_query(self, row_id, ctx): """ Delete row by id query creation. :param int row_id: Identifier of the deleted row. :param ResourceQueryContext ctx: The context of this delete query. """ assert isinstance(ctx, ResourceQueryContext) pk_name = self._model_pk.name return self._orm.query(self.model_cls).filter(getattr(self.model_cls, pk_name) == row_id)
def _delete_collection(self, ctx): """ Delete a collection from DB, optionally filtered by ``filter_by`` """ assert isinstance(ctx, ResourceQueryContext) filter_by = ctx.get_filter_by() q = self._orm.query(self.model_cls) if filter_by is not None: q = self.to_filter(q, filter_by) return q.delete()
[docs] class CachedModelResource(ModelResource): """ The cached resource - uses redis to cache the resource for the given amount of seconds. """ @inject(cache_module=CacheModule, id_helper=IdHelper, logger=logging.Logger) def __init__(self, model_cls, cache_module, id_helper, logger, timeout='short'): super().__init__(model_cls) self._cache_module = cache_module self._id_helper = id_helper self._logger = logger self._timeout = timeout def _create_key(self, arghash): key_prefix = create_key_class_prefix(self.model_cls) return "cached-resource:{0}:{1}".format(key_prefix, arghash) def _create_key_from_context(self, ctx): arghash = sha256(json.dumps({'params': ctx.params, 'args': ctx.args, 'data': ctx.data})).hexdigest() return self._create_key(arghash) def _get_one(self, row_id, ctx): # Make hash of params, args and data and ache using the hash in the key. key = self._create_key_from_context(ctx) self._logger.debug("CachedModelResource - get one, key: {0}.".format(key)) if self._id_helper.check_key(key): result = json.loads(self._id_helper.get_key(key)) else: self._logger.debug("CachedModelResource - get one not cached, transferring to resource...") result = super()._get_one(row_id, ctx) # serialize as model self._id_helper.set_key(key, app_model_encoder_fn(result), self._timeout) self.invalidate() return result def _get_collection_count(self, ctx): # Make hash of params, args and data and ache using the hash in the key. key = self._create_key_from_context(ctx) self._logger.debug("CachedModelResource - get one, key: {0}.".format(key)) if self._id_helper.check_key(key): result = int(self._id_helper.get_key(key)) else: self._logger.debug("CachedModelResource - get one not cached, transferring to resource...") result = super()._get_collection_count(ctx) self._id_helper.set_key(key, app_model_encoder_fn(result), self._timeout) return result def _get_collection(self, ctx): # Make hash of params, args and data and ache using the hash in the key. key = self._create_key_from_context(ctx) self._logger.debug("CachedModelResource - collection, key: {0}.".format(key)) if self._id_helper.check_key(key): result = self._id_helper.gather_page(key, app_model_decoder_fn) else: self._logger.debug("CachedModelResource - collection not cached, transferring to resource...") result = super()._get_collection(ctx) self._id_helper.fill_page(key, result, self._timeout, app_model_encoder_fn) return result
[docs] def invalidate(self): """ Invalidates all the data associated with this model """ key = self._create_key("") self._id_helper.invalidate_keys_by_prefix(key)
[docs] def create(self, params, args, data): rv = ModelResource.create(self, params, args, data) self.invalidate() return rv
[docs] def update(self, params, args, data): rv = ModelResource.update(self, params, args, data) self.invalidate() return rv
[docs] def delete(self, params, args, data): rv = ModelResource.delete(self, params, args, data) self.invalidate() return rv
[docs] class ReadOnlyResourceUpdateOperationException(Exception): def __init__(self, operation): self._operation = operation super().__init__( "Can not perform operation '{0}' on ReadOnlyResource.".format(operation))
[docs] def get_operation(self): return self._operation
operation = property(get_operation)
[docs] class ReadOnlyResourceMixin: """ The mixin to be used to forbid the update/delete and create operations. Remember the Python's MRO and place this mixin at the right place in the inheritance declaration. .. automethod:: create .. automethod:: update .. automethod:: delete """ OPERATION_CREATE = 'create' OPERATION_UPDATE = 'update' OPERATION_DELETE = 'delete'
[docs] @staticmethod def create(params, args, data): """Raises exception. Just raises ReadOnlyResourceUpdateOperationException to indicate that this method is not available. :raises ReadOnlyResourceUpdateOperationException: when accessed """ raise ReadOnlyResourceUpdateOperationException(ReadOnlyResourceMixin.OPERATION_CREATE)
[docs] @staticmethod def update(params, args, data): """Raises exception. Just raises ReadOnlyResourceUpdateOperationException to indicate that this method is not available. :raises ReadOnlyResourceUpdateOperationException: when accessed """ raise ReadOnlyResourceUpdateOperationException(ReadOnlyResourceMixin.OPERATION_UPDATE)
[docs] @staticmethod def delete(params, args, data): """Raises exception. Just raises ReadOnlyResourceUpdateOperationException to indicate that this method is not available. :raises ReadOnlyResourceUpdateOperationException: when accessed """ raise ReadOnlyResourceUpdateOperationException(ReadOnlyResourceMixin.OPERATION_DELETE)