Source code for fobi.contrib.apps.drf_integration.dynamic

from __future__ import unicode_literals

import copy
from collections import OrderedDict
from collections.abc import Mapping

import six
from django.core.exceptions import ValidationError as DjangoValidationError
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ErrorDetail, ValidationError
from rest_framework.fields import SkipField, empty, get_error_detail, set_value
from rest_framework.relations import PKOnlyObject
from rest_framework.serializers import BaseSerializer
from rest_framework.settings import api_settings
from rest_framework.utils import html, representation
from rest_framework.utils.serializer_helpers import (
    BindingDict,
    BoundField,
    NestedBoundField,
    ReturnDict,
)

from . import UID as INTEGRATE_WITH_UID

__title__ = "fobi.contrib.apps.drf_integration.dynamic"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2014-2022 Artur Barseghyan"
__license__ = "GPL 2.0/LGPL 2.1"
__all__ = (
    "assemble_serializer_class",
    "get_declared_fields",
)

# ****************************************************************************
# ****************************************************************************
# ************************ Serializer generator ******************************
# ****************************************************************************
# ****************************************************************************


[docs]def get_declared_fields( form_entry, request=None, origin=None, origin_kwargs_update_func=None, origin_return_func=None, form_element_entries=None, has_value=None, ): """Get declared fields.""" declared_fields = [] declared_fields_metadata = [] if form_element_entries is None: form_element_entries = form_entry.formelemententry_set.all() for creation_counter, form_element_entry in enumerate(form_element_entries): plugin = form_element_entry.get_plugin(request=request) # We simply make sure the plugin exists. We don't handle # exceptions relate to the non-existent plugins here. They # are instead handled in registry. if plugin: plugin_custom_field_instances = plugin._get_custom_field_instances( integrate_with=INTEGRATE_WITH_UID, form_element_entry=form_element_entry, origin=origin, kwargs_update_func=origin_kwargs_update_func, return_func=origin_return_func, extra={"counter": creation_counter}, request=request, has_value=has_value, ) for plugin_custom_field_instance in plugin_custom_field_instances: # The serializer field class custom_field_class = plugin_custom_field_instance.field_class( **plugin_custom_field_instance.field_kwargs ) # Since serializer fields do not accept **kwargs, we need # to assign `declared_fields_metadata` in the same way as we # do with `declared_fields`. declared_fields_metadata.append( ( plugin_custom_field_instance.data.name, plugin_custom_field_instance.field_metadata, ) ) declared_fields.append( ( plugin_custom_field_instance.data.name, custom_field_class, ) ) return OrderedDict(declared_fields), OrderedDict(declared_fields_metadata)
[docs]def assemble_serializer_class( form_entry, request=None, origin=None, origin_kwargs_update_func=None, origin_return_func=None, form_element_entries=None, has_value=None, declared_fields=None, declared_fields_metadata=None, ): """Assemble a serializer class by given entry. :param form_entry: :param base_class: :param django.http.HttpRequest request: :param string origin: :param callable origin_kwargs_update_func: :param callable origin_return_func: :param iterable form_element_entries: If given, used instead of ``form_entry.formelemententry_set.all`` (no additional database hit). :param bool has_value: """ # if form_element_entries is None: # form_element_entries = form_entry.formelemententry_set.all() # class DeclarativeFieldsSerializerMetaclass(SerializerMetaclass): # """Meta-class for setting ``_declared_fields`` dictionary. # # Any instances of ``Field`` included as attributes on either the class # or on any of its superclasses will be include in the # ``_declared_fields`` dictionary. # """ # # def __new__(cls, name, bases, attrs): # # # Get the declared fields dict. # declared_fields = get_declared_fields( # form_entry, # origin=origin, # origin_kwargs_update_func=origin_kwargs_update_func, # origin_return_func=origin_return_func, # request=request, # form_element_entries=form_element_entries, # ) # attrs['_declared_fields'] = declared_fields # # new_class = super( # DeclarativeFieldsSerializerMetaclass, cls # ).__new__( # cls, name, bases, attrs # ) # # if 'media' not in attrs: # new_class.media = media_property(new_class) # # return new_class # # class DynamicSerializer( # with_metaclass(DeclarativeFieldsSerializerMetaclass, base_class) # ): # """Dynamically created form element plugin class.""" # # return DynamicSerializer if declared_fields is None: declared_fields, declared_fields_metadata = get_declared_fields( form_entry, origin=origin, origin_kwargs_update_func=origin_kwargs_update_func, origin_return_func=origin_return_func, request=request, form_element_entries=form_element_entries, has_value=has_value, ) # Most of the code below has been copied from rest_framework.serializers # module with a few modifications. Simplifying this is a TODO, # thus if you know a better way of doing this (reducing the code), # please do not hesitate to make a pull request. class SerializerMetaclass(type): """ This metaclass sets a dictionary named `_declared_fields` on the class. Any instances of `Field` included as attributes on either the class or on any of its superclasses will be include in the `_declared_fields` dictionary. """ @classmethod def _get_declared_fields(cls, bases, attrs): """Modified version of the original _get_declared_fields.""" fields = [ (field_name, obj) for field_name, obj in declared_fields.items() if field_name not in attrs ] return OrderedDict(fields) @classmethod def _get_declared_fields_metadata(cls, bases, attrs): """Similar to _get_declared_fields, but for metadata.""" fields = [ (field_name, obj) for field_name, obj in declared_fields_metadata.items() if field_name not in attrs ] return OrderedDict(fields) def __new__(cls, name, bases, attrs): """Modified version of the original __new__.""" attrs["_declared_fields"] = cls._get_declared_fields(bases, attrs) attrs[ "_declared_fields_metadata" ] = cls._get_declared_fields_metadata(bases, attrs) return super(SerializerMetaclass, cls).__new__( cls, name, bases, attrs ) def as_serializer_error(exc): assert isinstance(exc, (ValidationError, DjangoValidationError)) if isinstance(exc, DjangoValidationError): detail = get_error_detail(exc) else: detail = exc.detail if isinstance(detail, Mapping): # If errors may be a dict we use the standard # {key: list of values}. Here we ensure that all the values are # *lists* of errors. return { key: value if isinstance(value, (list, Mapping)) else [value] for key, value in detail.items() } elif isinstance(detail, list): # Errors raised as a list are non-field errors. return {api_settings.NON_FIELD_ERRORS_KEY: detail} # Errors raised as a string are non-field errors. return {api_settings.NON_FIELD_ERRORS_KEY: [detail]} @six.add_metaclass(SerializerMetaclass) class Serializer(BaseSerializer): default_error_messages = { "invalid": _( "Invalid data. Expected a dictionary, but got {datatype}." ) } @property def fields(self): """ A dictionary of {field_name: field_instance}. """ # `fields` is evaluated lazily. We do this to ensure that we don't # have issues importing modules that use ModelSerializers as # fields, even if Django's app-loading stage has not yet run. if not hasattr(self, "_fields"): self._fields = BindingDict(self) for key, value in self.get_fields().items(): self._fields[key] = value return self._fields @cached_property def _writable_fields(self): return [ field for field in self.fields.values() if (not field.read_only) or (field.default is not empty) ] @cached_property def _readable_fields(self): return [ field for field in self.fields.values() if not field.write_only ] def get_fields(self): """ Returns a dictionary of {field_name: field_instance}. """ # Every new serializer is created with a clone of the field # instances. This allows users to dynamically modify the fields # on a serializer instance without affecting every other # serializer class. return copy.deepcopy(self._declared_fields) def get_fields_metadata(self, field_name=None): """ Returns a dictionary of {field_name: field_instance}. """ # Every new serializer is created with a clone of the field # instances. This allows users to dynamically modify the fields # on a serializer instance without affecting every other # serializer class. fields_metadata = copy.deepcopy(self._declared_fields_metadata) if field_name is not None: return fields_metadata.get(field_name) return fields_metadata def get_validators(self): """ Returns a list of validator callables. """ # Used by the lazily-evaluated `validators` property. meta = getattr(self, "Meta", None) validators = getattr(meta, "validators", None) return validators[:] if validators else [] def get_initial(self): if hasattr(self, "initial_data"): return OrderedDict( [ (field_name, field.get_value(self.initial_data)) for field_name, field in self.fields.items() if (field.get_value(self.initial_data) is not empty) and not field.read_only ] ) return OrderedDict( [ (field.field_name, field.get_initial()) for field in self.fields.values() if not field.read_only ] ) def get_value(self, dictionary): # We override the default field access in order to support # nested HTML forms. if html.is_html_input(dictionary): return ( html.parse_html_dict(dictionary, prefix=self.field_name) or empty ) return dictionary.get(self.field_name, empty) def run_validation(self, data=empty): """ We override the default `run_validation`, because the validation performed by validators and the `.validate()` method should be coerced into an error dictionary with a 'non_fields_error' key. """ (is_empty_value, data) = self.validate_empty_values(data) if is_empty_value: return data value = self.to_internal_value(data) try: self.run_validators(value) value = self.validate(value) assert value is not None, ( ".validate() should return the " "validated data" ) except (ValidationError, DjangoValidationError) as exc: raise ValidationError(detail=as_serializer_error(exc)) return value def to_internal_value(self, data): """ Dict of native values <- Dict of primitive datatypes. """ if not isinstance(data, Mapping): message = self.error_messages["invalid"].format( datatype=type(data).__name__ ) raise ValidationError( {api_settings.NON_FIELD_ERRORS_KEY: [message]}, code="invalid", ) ret = OrderedDict() errors = OrderedDict() fields = self._writable_fields for field in fields: validate_method = getattr( self, "validate_" + field.field_name, None ) primitive_value = field.get_value(data) try: validated_value = field.run_validation(primitive_value) if validate_method is not None: validated_value = validate_method(validated_value) except ValidationError as exc: errors[field.field_name] = exc.detail except DjangoValidationError as exc: errors[field.field_name] = get_error_detail(exc) except SkipField: pass else: set_value(ret, field.source_attrs, validated_value) if errors: raise ValidationError(errors) return ret def to_representation(self, instance): """ Object instance -> Dict of primitive datatypes. """ ret = OrderedDict() fields = self._readable_fields for field in fields: try: attribute = field.get_attribute(instance) except SkipField: continue # We skip `to_representation` for `None` values so that # fields do not have to explicitly deal with that case. # # For related fields with `use_pk_only_optimization` we need to # resolve the pk value. check_for_none = ( attribute.pk if isinstance(attribute, PKOnlyObject) else attribute ) if check_for_none is None: ret[field.field_name] = None else: ret[field.field_name] = field.to_representation(attribute) return ret def validate(self, attrs): return attrs def __repr__(self): return representation.serializer_repr(self, indent=1) # The following are used for accessing `BoundField` instances on the # serializer, for the purposes of presenting a form-like API onto the # field values and field errors. def __iter__(self): for field in self.fields.values(): yield self[field.field_name] def __getitem__(self, key): field = self.fields[key] value = self.data.get(key) error = self.errors.get(key) if hasattr(self, "_errors") else None if isinstance(field, Serializer): return NestedBoundField(field, value, error) return BoundField(field, value, error) # Include a backlink to the serializer class on return objects. # Allows renderers such as HTMLFormRenderer to get the full field info. @property def data(self): ret = super(Serializer, self).data return ReturnDict(ret, serializer=self) @property def errors(self): ret = super(Serializer, self).errors if ( isinstance(ret, list) and len(ret) == 1 and getattr(ret[0], "code", None) == "null" ): # Edge case. Provide a more descriptive error than # "this field may not be null", when no data is passed. detail = ErrorDetail("No data provided", code="null") ret = {api_settings.NON_FIELD_ERRORS_KEY: [detail]} return ReturnDict(ret, serializer=self) def update(self, instance, validated_data): """Update method.""" # It's critical to assign the validated data to the instance, # however we SHOULD NOT save it, since it will obviously make # things break. All of this is done to trick the rest_framework # to make a profit of all the nice things that it provides # with lowest cost possible. for key, value in validated_data.items(): setattr(instance, key, value) return instance return Serializer