Source code for swak.jsonobject.jsonobject

from typing import Any, Self
from collections.abc import KeysView, Callable, Iterator
from functools import reduce
from ast import literal_eval
import json
from json import JSONDecodeError
from pandas import Series
from .fields import Maybe
from .exceptions import (
    SchemaError,
    DefaultsError,
    ParseError,
    CastError,
    ValidationErrors
)

type Json = dict[str, Any]
type Raw = str | bytes | bytearray | Json | Series | None
type Schema = dict[str, type | Callable[[Any], Any]]
type Types = tuple[type, ...]
type Tried = tuple[Json, list[Exception]]


class SchemaMeta(type):
    """Metaclass parsing, validating, and setting data schema definitions.

    This class is not intended to be instantiated directly but only to be used
    as a metaclass! When used as such, it parses type-annotated class variables
    (with or without default values) into `schema` and `defaults` class
    variables, and checks that type annotations are callable, can be called
    on the default values, and that default values are not ``None`` unless
    wrapped with ``Maybe``. Additionally, it makes sure that schema fields
    cannot collide with existing class variables as well as instance attributes
    and methods.

    """

    __blacklist__ = {
        'as_json',
        'as_series',
        'as_dtype',
        'get',  # Do we even need this method?
        'keys',
        '_serialize'
    }

    __ignore_extra__ = False
    __raise_extra__ = True
    __respect_none__ = False

    def __new__(
            mcs,  # noqa: N804
            name: str,
            bases: Types,
            attrs: Json,
            **kwargs: Any
    ) -> 'SchemaMeta':
        # Create a new class object
        cls = super().__new__(mcs, name, bases, attrs)

        # Set behavior from past and present class keywords
        ancestral_variables = mcs.__ancestral(cls, '__dict__')
        cls.__ignore_extra__ = kwargs.pop(
            'ignore_extra',
            ancestral_variables.get('__ignore_extra__', mcs.__ignore_extra__)
        )
        cls.__raise_extra__ = kwargs.pop(
            'raise_extra',
            ancestral_variables.get('__raise_extra__', mcs.__raise_extra__)
        )
        cls.__respect_none__ = kwargs.pop(
            'respect_none',
            ancestral_variables.get('__respect_none__', mcs.__respect_none__)
        )

        # Consolidate values from "keys_from" keyword and the remaining kwargs
        kwargs = kwargs.pop('keys_from', {}) | kwargs

        # The schema is in the class __annotations__
        ancestral_schema = mcs.__ancestral(cls, '__annotations__')
        # Extract (string) keys from additional keyword arguments
        kwarg_schema = dict.fromkeys(kwargs, str)
        # Class body fields overwrite keyword fields overwrite inherited fields
        schema = ancestral_schema | kwarg_schema | cls.__annotations__
        # Validate the schema just assembled
        schema, schema_errors = mcs.__valid(name, schema, mcs.__blacklist__)

        # Ancestral defaults are in the class __defaults__
        ancestral_defaults = mcs.__ancestral(cls, '__defaults__')
        # Defaults for additional fields from keyword arguments are the keys
        kwarg_defaults = {k: k for k in kwargs if k not in cls.__annotations__}
        # Class variables overwrite keyword defaults overwrite ancestry
        updated_defaults = ancestral_defaults | kwarg_defaults | cls.__dict__
        # Only type-annotated class variables are relevant
        filtered_defaults = mcs.__filter(schema, updated_defaults)
        # Use __defaults__ because __dict__ cannot be set or updated directly
        defaults, default_errors = mcs.__tried(filtered_defaults, schema)

        # Raise accumulated errors, if any
        errors = schema_errors + default_errors
        if errors:
            raise ValidationErrors(name, errors)

        # Set hidden class variables
        cls.__blacklist__ = mcs.__blacklist__
        cls.__annotations__ = schema
        cls.__defaults__ = defaults

        return cls

    @staticmethod
    def __ancestral(descendant: 'SchemaMeta', attribute: str) -> Json:
        """Accumulate dictionary class variables down the inheritance tree."""
        # Get class ancestors starting with the oldest
        lineage = reversed(descendant.mro()[1:])
        # Accumulate inherited dictionary attributes, overwriting old with new
        return reduce(descendant.__merge(attribute), lineage, {})

    @staticmethod
    def __merge(attribute: str) -> Callable[[Json, Any], Json]:
        """Provide update function for dictionary class attributes."""

        def update(older: Json, newer: Any) -> Json:
            """Update dictionary attribute of parent with that of child."""
            return {**older, **getattr(newer, attribute, {})}

        return update

    @staticmethod
    def __filter(schema: Schema, defaults: Json) -> Json:
        """Filter down class __dict__ to keys present in the schema."""
        return {key: defaults[key] for key in defaults if key in schema}

    @staticmethod
    def __valid(
            name: str,
            schema: Schema,
            blacklist: set[str]
    ) -> tuple[Schema, list[Exception]]:
        """Validate that class-variable annotations are sane."""
        hidden = f'_{name}__'  # Pattern for double-underscore class variables
        errors = []
        for field, annotation in schema.items():
            if not callable(annotation):
                msg = f'Annotation of field "{field}" is not callable!'
                errors.append(SchemaError(msg))
            if field in blacklist:
                msg = f'Field "{field}" is on the blacklist {blacklist}!'
                errors.append(SchemaError(msg))
            if field.startswith(hidden):
                cleaned = field.removeprefix(hidden)
                msg = f'Field "__{cleaned}" starts with two underscores "__"!'
                errors.append(SchemaError(msg))
        return schema, errors

    @staticmethod
    def __tried(defaults: Json, schema: Schema) -> Tried:
        """Ensure that class-variable defaults are sane."""
        errors = []
        for item in defaults:
            # Check for None values and whether they are allowed
            default_is_none = defaults[item] is None
            type_is_not_maybe = not isinstance(schema[item], Maybe)
            if default_is_none and type_is_not_maybe:
                msg = (f'For the default value of field "{item} to be None,'
                       ' annotate it as Maybe(<YOUR_TYPE>) in the schema!')
                errors.append(DefaultsError(msg))
            # Check that schema annotations can be called on default values
            try:
                defaults[item] = schema[item](defaults[item])
            except (TypeError, ValueError):
                msg = (f'Default value for field "{item}" can'
                       ' not be cast to the desired type!')
                errors.append(DefaultsError(msg))
        return defaults, errors


# ToDo: Add polar-rs support!
[docs] class JsonObject(metaclass=SchemaMeta): """Flexible Dataclass-like data structure with enforced type schema. This class is not meant to ever be instantiated directly. Rather, inherit from it, and specify fields as type-annotated class variables, potentially giving also default values. Values for non-default fields must be provided on instantiation in the form of a JSON string, a dictionary-like object, or keyword arguments. The handling of additional fields can be specified via boolean class keywords `ignore_extra` (defaults to ``False``) and `raise_extra` (defaults to ``True``). By default, JSON fields with a ``None`` value are ignored and treated as not being present. To actually set fields to ``None`` (and, potentially, overwrite defaults), the class keyword `respect_none` needs to be set to ``True`` on subclass definition. Note, however, that type annotations must also tolerate ``None`` values, which is realized by wrapping existing types into ``Maybe`` instances. The resulting object behaves in many ways like a dictionary, allowing dictionary-style, but also object-style access to data fields. Attributes of nested instances can be accessed dictionary-style (i.e., with the square-bracket accessor) with a dot.separated key. Parameters ---------- mapping: dict, str, bytes, or Series, optional Dictionary with string keys, JSON string/bytes, or pandas Series. Defaults to an empty dictionary. **kwargs Can be any value or, for nested structures, again a dictionary with string keys or a JSON string/bytes or a pandas Series. Keyword arguments will override values already present in the `mapping`. Raises ------ ValidationErrors ExceptionGroup containing any number of the following exceptions. ParseError If the (keyword) arguments cannot be parsed into a dictionary with string keys and if non-default fields are neither given in the `mapping` nor in the keyword arguments. CastError If the dictionary values cannot be cast into the types specified in the schema. Warnings -------- This class is rather heavy, so do not use it to, e.g., wrap JSON payloads in high-throughput low-latency web services! See Also -------- fields.Maybe """ def __init__(self, mapping: Raw | Self = None, **kwargs: Any) -> None: # Fully nest the parsed and purged dictionaries with dot.separated keys parsed = self.__nest(self.__purge(self.__parse(mapping))) kwargs = self.__nest(self.__purge(self.__parse(kwargs, 1))) defaults = self.__nest(self.__defaults__) # Merge the fully nested dictionaries merged = self.__merge(defaults, self.__merge(parsed, kwargs)) # Type-cast the merged dictionary cast = self.__cast(merged) # Set all dictionary items as object attributes self.__dict__.update(cast) def __getitem__(self, key: str) -> Any: # Raise for blacklisted keys if key in self.__blacklist__: raise KeyError(key) # Try and split the (string) key by dots try: root, *children = key.split('.') except AttributeError as error: cls = type(key).__name__ msg = f'Keys must be strings, not {cls} like {key}!' raise KeyError(msg) from error # The key could also refer to an attribute like a property or a method try: value = self.__dict__.get(root, getattr(self, root)) # ... but we still raise a KeyError to meet expectations except AttributeError as error: raise KeyError(key) from error # If the key contains dots, recurse down into the value return reduce(lambda x, y: x[y], children, value) def __iter__(self) -> Iterator[str]: return self.__dict__.__iter__() def __str__(self) -> str: return json.dumps(self.__dict__, default=self._serialize) def __repr__(self) -> str: return json.dumps(self.__dict__, indent=4, default=self._serialize) __hash__ = None def __eq__(self, other: Self) -> bool: if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ return NotImplemented def __ne__(self, other: Self) -> bool: if isinstance(other, self.__class__): return self.__dict__ != other.__dict__ return NotImplemented def __contains__(self, key: str) -> bool: return key in self.__dict__ def __len__(self) -> int: return len(self.__dict__) def __bool__(self) -> bool: return bool(self.__dict__) def __or__(self, other: Raw | Self) -> Self: return self.__call__(other) def __ror__(self, other: Raw | Self) -> dict: try: return {**other, **self} except TypeError: return NotImplemented
[docs] def __call__(self, mapping: Raw | Self = None, **kwargs: Any) -> Self: """Update one or more (nested) fields with `mapping` and kwargs. Parameters ---------- mapping: dict, str, bytes, or Series, optional Dictionary with string keys, JSON string/bytes, or pandas Series. Defaults to an empty dictionary. **kwargs Can be any value or, for nested structures, again a dictionary with string keys or a JSON string/bytes or a pandas Series. Keyword arguments will override values already present in the `mapping`. Returns ------- JsonObject A new instance of self with updated values. Raises ------ ValidationErrors ExceptionGroup containing any number of the following exceptions. ParseError If the (keyword) arguments cannot be parsed into a dictionary with string keys. CastError If the dictionary values cannot be cast into the types specified in the schema. """ # Fully nest the parsed and purged dictionaries with dot.separated keys parsed = self.__nest(self.__purge(self.__parse(mapping))) kwargs = self.__nest(self.__purge(self.__parse(kwargs, 1))) # Merge the call arguments merged = self.__merge(parsed, kwargs) # Left merge with self merged = self.__merge(self.__dict__, merged,True) # Instantiate a new, updated copy of self from the fully nested update return self.__class__(merged)
@property def as_json(self) -> Json: """JSON-serializable dictionary representation.""" return json.loads(str(self)) @property def as_dtype(self) -> str: """Representation in a cell of a pandas data frame.""" return self.__str__() @property def as_series(self) -> Series: """Representation as a pandas series.""" data = {key: getattr(self[key], 'as_dtype', self[key]) for key in self} name = self.__class__.__name__ return Series(data, name=name) # Do we even need this method?
[docs] def get(self, item: str, default: Any = None) -> Any: """Get (nested) attribute by (dot.separated) name or default.""" try: return self[item] except KeyError: return default
[docs] def keys(self) -> KeysView[str]: """Attribute names as dictionary keys.""" return self.__dict__.keys()
@staticmethod def _serialize(obj: Any) -> Any: """Default JSON-encoding for attributes not trivially serializable.""" return obj.as_json if hasattr(obj, 'as_json') else repr(obj) def __parse(self, obj: Raw | Self, level: int = 0) -> Json: """Recursively parse input into a (nested) dictionary.""" # For the initial, root-level call, None means an empty dictionary mapping = {} if obj is None and level == 0 else obj # Try to parse the input as a JSON string ... try: parsed = json.loads(mapping) except (TypeError, JSONDecodeError): # ... or some other string representation of a python object try: parsed = literal_eval(mapping) # In case of failure, it might already be a dictionary except (TypeError, ValueError, SyntaxError): parsed = mapping # If it is, this should work. try: parsed = {**parsed} # If not ... except TypeError as error: # ... we're done with the recursion and simply return the input ... if level > 0: return obj # ... unless this was the initial, root-level call. Then we fail raise ParseError(f'Could not parse {obj} as JSON!') from error # Recurse further down into the value of the parsed dictionary return {key: self.__parse(parsed[key], level + 1) for key in parsed} def __purge(self, mapping: Json) -> Json: """Eliminate items with ``None`` value according to `respect_none`.""" filters = {True: lambda _: True, False: lambda xs: xs[1] is not None} return dict(filter(filters[self.__respect_none__], mapping.items())) @staticmethod def __stop_recursion_for(obj: Any) -> bool: """Criterion for stopping recursions dictionary nesting and merging. As we recursively traverse the tree of dictionary-like objects from root to leaves, we stop when we arrive at a leave that is no longer dictionary-like. """ try: _ = [*obj.keys()] except (AttributeError, TypeError): return True return not hasattr(obj, '__getitem__') def __nest(self, mapping: Json | Self) -> Json: """Nest a dictionary with nesting implied by dot.separated keys.""" # If the input is no longer dictionary-like, end the recursion if self.__stop_recursion_for(mapping): return mapping # If it is, initialize the return value ... result = {} # ... and iterate through the keys for key in mapping.keys(): # noqa: SIM118 # Get the value to the current key value = mapping[key] # Depending on the type to key ... if isinstance(key, str): # ... split the root from the children root, *children = key.split('.') else: # ... or leave it as it is root, *children = key, # If the current key did have dots, ... if children: # ... the value is elevated to a dict value = {'.'.join(children): value} # If the root key already exists in the results ... if root in result: # ... merge it with the new value result[root] = self.__merge(result[root], value) else: # ... or, if not, just set it to the new value result[root] = value # After nesting one level, recurse further down on the values return {key: self.__nest(value) for key, value in result.items()} def __merge(self, old: Json, new: Json, left: bool = False) -> Json: """Recursively deep-merge two dictionaries, outer or left.""" if self.__stop_recursion_for(old) or self.__stop_recursion_for(new): return new # First the old values in order of appearance ... result = {key: old[key] for key in old if key not in new} # ... then intersection of old and new in order of appearance in old for key in [key for key in old if key in new]: result[key] = self.__merge(old[key], new[key], left) # If requested, add fields only present in new in order of appearance right = {} if left else {k: new[k] for k in new if k not in old} return result | right def __cast(self, mapping: Json) -> Json: """Cast all fields in the data structure to their specified type.""" # Initialize accumulators cast = {} errors = [] # Iterate over the fields in the schema for item, type_cast in self.__annotations__.items(): try: value = mapping[item] except KeyError: msg = f'Missing non-default field "{item}"!' errors.append(ParseError(msg)) continue try: cast[item] = type_cast(value) except (TypeError, ValueError): msg = f'Could not cast field "{item}" to the desired type!' errors.append(CastError(msg)) except ValidationErrors as error_group: errors.append(error_group) if value is None and not isinstance(type_cast, Maybe): msg = (f'For the value of field "{item} to be None, annotate' ' it as Maybe(<YOUR_TYPE>) in the schema!') errors.append(CastError(msg)) # If we don't have to deal with extra fields, we're done if self.__ignore_extra__: if errors: raise ValidationErrors(self.__class__.__name__, errors) return cast # If not, first check if we even allow extra fields extra_fields = set(mapping) - set(self.__annotations__) if extra_fields and self.__raise_extra__: msg = f'Fields {extra_fields} are not in the schema!' errors.append(ParseError(msg)) raise ValidationErrors(self.__class__.__name__, errors) # Even if extra fields are not ignored and are allowed, we need to ... for field in extra_fields: # ... check that their keys are strings, ... if not isinstance(field, str): msg = f'Extra field "{field}" does not have a string key!' errors.append(ParseError(msg)) continue # ... check that their keys are not blacklisted, ... if field in self.__blacklist__: msg = (f'Extra field "{field}" is on the ' f'blacklist {self.__blacklist__}!') errors.append(ParseError(msg)) # ... and check that their keys are valid python identifiers. if not all(part.isidentifier() for part in field.split('.')): msg = (f'Not all parts of the (potentially dot.separated) key' f' of field "{field}" are valid python identifiers!') errors.append(ParseError(msg)) # If we found anything fishy, raise all errors together if errors: raise ValidationErrors(self.__class__.__name__, errors) # Only now do we accept and merge extra fields. extras = {field: mapping[field] for field in extra_fields} return {**cast, **extras}