Source code for testplan.common.serialization.fields

"""
Custom marshmallow fields.
"""
import abc
import pprint

import pytz
from dateutil import parser
from lxml import etree

from marshmallow import fields
from marshmallow import class_registry
from marshmallow.base import SchemaABC
from marshmallow.utils import missing as missing_

from testplan.common.utils import comparison

# We explicitly enumerate types that are known to be safe to serialize by
# pickle. All other types will be converted to strings before pickling.
# types.NoneType is gone in python3 so we inspect the type of None directly.
COMPATIBLE_TYPES = (bool, float, type(None), str, bytes, int)

# pylint: disable=unused-argument


[docs]class Serializable(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod def serialize(self): pass
[docs]class FormattedValue(Serializable): """ Save a formatted value in WebUI """ def __init__(self, value, display): """ :param value: The value of the data for sorting in the report. :type value: ``Union[str, numbers.Real]`` :param display: Formatted value for display in the report. :type display: ``str`` """ self.value = value self.display = display
[docs] def serialize(self): return { "value": self.value, "display": self.display, "type": "formattedValue", }
def _repr_obj(obj): # copypasta from unittest code try: return repr(obj) except Exception: return object.__repr__(obj)
[docs]def native_or_pformat(value): """Generic serialization compatible value formatter.""" if comparison.is_regex(value): value = "REGEX({})".format(value.pattern) elif isinstance(value, comparison.Callable): value = str(value) elif callable(value): value = getattr(value, "__name__", _repr_obj(value)) # For basic builtin types we return the value unchanged. All other types # will be formatted as strings. if type(value) in COMPATIBLE_TYPES: result = value else: result = pprint.pformat(value) return result
[docs]def native_or_pformat_dict(value): """ Converter utility for dictionaries, converts values to JSON friendly format """ return {k: native_or_pformat(v) for k, v in value.items()}
[docs]def native_or_pformat_list(value): """Converter utility for lists, converts values to JSON friendly format""" return [native_or_pformat(v) for v in value]
[docs]class Unicode(fields.Field): """ Field that tries to convert value into a unicode object with the given codecs. Marshmallow internally decodes to utf-8 encoding, however it fails on Python 2 for str values like ``@t\xe9\xa7t\xfel\xe5\xf1``. So we have this field with explicit codecs instead. """ codecs = ["utf-8", "latin-1"] # Ideally we will let users override this def _serialize(self, value, attr, obj, **kwargs): if isinstance(value, str) or value is None: return value elif isinstance(value, bytes): for codec in self.codecs: try: return str(value, codec) except UnicodeDecodeError: pass raise ValueError( "Could not decode {value} to unicode" " with the given codecs: {codecs}".format( value=value, codecs=self.codecs ) ) else: return str(value)
[docs]class NativeOrPretty(fields.Field): """ Uses serialization compatible native values or pretty formatted str representation. """ def _serialize(self, value, attr, obj, **kwargs): if isinstance(value, Serializable): return value.serialize() else: return native_or_pformat(value)
[docs]class NativeOrPrettyDict(fields.Field): """ Dictionary serialization with native or pretty formatted values. Keys should be JSON serializable (str type), should be used for flat dicts only. """ def _serialize(self, value, attr, obj, **kwargs): if not isinstance(value, dict): raise TypeError( "`value` ({value}) should be" " `dict` type, it was: {type}".format( value=value, type=type(value) ) ) for k in value: if not isinstance(k, str): raise TypeError( "`key` ({key}) should be of" " `str` type, it was: {type}".format(key=k, type=type(k)) ) return native_or_pformat_dict(value)
# TODO: Move to entries
[docs]class RowComparisonField(fields.Field): """Serialization logic for RowComparison""" def _serialize(self, value, attr, obj, **kwargs): idx, row, diff, errors, extra = value return ( idx, native_or_pformat_list(row), native_or_pformat_dict(diff), native_or_pformat_dict(errors), native_or_pformat_dict(extra), )
[docs]class SliceComparisonField(fields.Field): """Serialization logic for SliceComparison""" def _serialize(self, value, attr, obj, **kwargs): def str_or_iterable(val): return val if isinstance(val, str) else native_or_pformat_list(val) slice_obj, comp_indices, mismatch_indices, actual, expected = value return ( repr(slice_obj), comp_indices, mismatch_indices, str_or_iterable(actual), str_or_iterable(expected), )
[docs]class ColumnContainComparisonField(fields.Field): """Serialization logic for ColumnContainComparison""" def _serialize(self, value, attr, obj, **kwargs): return (value.idx, native_or_pformat(value.value), value.passed)
[docs]class XMLElementField(fields.Field): """Custom field for `lxml.etree.Element serialization`.""" def _serialize(self, value, attr, obj, **kwargs): return etree.tostring(value, pretty_print=True).decode("utf-8")
[docs]class ClassName(fields.Field): """Return the class name of the `obj`.""" _CHECK_ATTRIBUTE = False
[docs] class Meta: # pylint: disable=bad-option-value,old-style-class,missing-docstring,no-init dump_only = True
def _serialize(self, value, attr, obj, **kwargs): return obj.__class__.__name__
[docs]class DictMatch(fields.Field): def _serialize(self, value, attr, obj, **kwargs): keys = ("value", "ignore", "only") return {key: getattr(value, key) for key in keys}
[docs]class GenericNested(fields.Field): """ Marshmallow does not support multiple schemas for a single `Nested` field. There is a project (marshmallow-oneofschema) that has similar functionality but it doesn't support self-referencing schemas, which is needed for serializing tree structures. This field should be used along with `ClassNameField` to return the type (class name) of the objects, so it can choose the correct schema during deserialization. """ def __init__( self, schema_context, type_field="type", default=missing_, **kwargs ): self.schema_context = schema_context self.type_field = type_field self.many = kwargs.get("many", False) super(GenericNested, self).__init__(default=default, **kwargs) def _get_schema_obj(self, schema_value): parent_ctx = getattr(self.parent, "context", {}) if callable(schema_value) and not isinstance(schema_value, type): schema_value = schema_value() if isinstance(schema_value, SchemaABC): schema_value.context.update(parent_ctx) return schema_value elif isinstance(schema_value, type) and issubclass( schema_value, SchemaABC ): return schema_value(many=self.many, context=parent_ctx) elif isinstance(schema_value, str): if schema_value == "self": return self.parent.__class__( many=self.many, context=parent_ctx ) else: schema_class = class_registry.get_class(schema_value) return schema_class(many=self.many, context=parent_ctx) raise ValueError( "Invalid value for schema: {}, {}".format( schema_value, type(schema_value) ) ) @property def schemas(self): """Return schema mapping in `<CLASS_NAME>: <SCHEMA_OBJECT>` format.""" result = {} for object_type, schema_value in self.schema_context.items(): if isinstance(object_type, str): key = object_type elif isinstance(object_type, type): key = object_type.__name__ else: raise ValueError( "Invalid value for object type ({}), strings" " and class objects are allowed.".format(object_type) ) result[key] = self._get_schema_obj(schema_value) return result def _serialize(self, nested_obj, attr, obj, **kwargs): if nested_obj is None: return None schemas = self.schemas if isinstance(nested_obj, (list, tuple)): return [self._serialize(nobj, attr, obj) for nobj in nested_obj] class_name = nested_obj.__class__.__name__ if class_name not in schemas: raise KeyError( "No schema declaration found in" " `schema_context` for : {}".format(class_name) ) schema_obj = schemas[class_name] return schema_obj.dump(nested_obj, many=False)
[docs]class UTCDateTime(fields.DateTime): """ A formatted datetime string that represents UTC time. Naive datetime will be thought as in UTC timezone. Example: 2014-12-22T03:12:58.019077+00:00 (always ends with '+00:00') """ def _serialize(self, value, attr, obj, **kwargs): if value is None: return None return ( value.replace(tzinfo=pytz.UTC) if value.tzinfo is None else value.astimezone(tz=pytz.UTC) ).isoformat() def _deserialize(self, value, attr, data, **kwargs): if value is None: return None dt = parser.parse(value) return ( dt.replace(tzinfo=pytz.UTC) if dt.tzinfo is None else dt.astimezone(tz=pytz.UTC) )
[docs]class LocalDateTime(fields.DateTime): """ A formatted datetime string that represents machine time. Naive datetime will be thought as in local timezone. Example: 2014-12-22T11:12:58.019077+08:00 Note: Since Python 3.6 `datetime.datetime.astimezone` method can be called on naive instances that are presumed to represent system local time. """ def _serialize(self, value, attr, obj, **kwargs): return None if value is None else value.astimezone().isoformat() def _deserialize(self, value, attr, data, **kwargs): return None if value is None else parser.parse(value).astimezone()
[docs]class ExceptionField(fields.Field): """ Serialize exceptions type and message. """ def _serialize(self, value, attr, obj, **kwargs): return (str(type(value)), str(value))