Source code for interface.interface

from collections import defaultdict
from operator import attrgetter, itemgetter
from textwrap import dedent
from weakref import WeakKeyDictionary

from .compat import raise_from, with_metaclass
from .default import default, warn_if_defaults_use_non_interface_members
from .formatting import bulleted_list
from .functional import complement, keyfilter, merge, valfilter
from .typecheck import compatible
from .typed_signature import TypedSignature
from .utils import is_a, unique

first = itemgetter(0)
getname = attrgetter("__name__")


class InvalidImplementation(TypeError):
    """
    Raised when a class intending to implement an interface fails to do so.
    """


class InvalidSubInterface(TypeError):
    """
    Raised when on attempt to define a subclass of an interface that's not
    compatible with the parent definition.
    """


CLASS_ATTRIBUTE_WHITELIST = frozenset(
    [
        "__doc__",
        "__module__",
        "__name__",
        "__qualname__",
        "__weakref__",
        "_INTERFACE_IGNORE_MEMBERS",
    ]
)

is_interface_field_name = complement(CLASS_ATTRIBUTE_WHITELIST.__contains__)

TRIVIAL_CLASS_ATTRIBUTES = frozenset(dir(type("_", (object,), {})))


def static_get_type_attr(t, name):
    """
    Get a type attribute statically, circumventing the descriptor protocol.
    """
    for type_ in t.mro():
        try:
            return vars(type_)[name]
        except KeyError:
            pass
    raise AttributeError(name)


def _conflicting_defaults(typename, conflicts):
    """Format an error message for conflicting default implementations.

    Parameters
    ----------
    typename : str
        Name of the type for which we're producing an error.
    conflicts : dict[str -> list[Interface]]
        Map from strings to interfaces providing a default with that name.

    Returns
    -------
    message : str
        User-facing error message.
    """
    message = "\nclass {C} received conflicting default implementations:".format(
        C=typename,
    )
    for attrname, interfaces in conflicts.items():
        message += dedent(
            """

            The following interfaces provided default implementations for {attr!r}:
            {interfaces}"""
        ).format(
            attr=attrname,
            interfaces=bulleted_list(sorted(map(getname, interfaces))),
        )
    return InvalidImplementation(message)


def _merge_parent_signatures(bases):
    return merge(filter(None, (getattr(b, "_signatures") for b in bases)))


def _merge_parent_defaults(bases):
    return merge(filter(None, (getattr(b, "_defaults") for b in bases)))


class InterfaceMeta(type):
    """
    Metaclass for interfaces.

    Supplies a ``_signatures`` attribute.
    """

    def __new__(mcls, name, bases, clsdict):
        signatures = _merge_parent_signatures(bases)
        defaults = _merge_parent_defaults(bases)
        ignored = clsdict.get("_INTERFACE_IGNORE_MEMBERS", set())

        for field, v in keyfilter(is_interface_field_name, clsdict).items():
            if field in ignored:
                continue

            try:
                signature = TypedSignature(v)
            except TypeError as e:
                errmsg = (
                    "Couldn't parse signature for field "
                    "{iface_name}.{fieldname} of type {attrtype}.".format(
                        iface_name=name,
                        fieldname=field,
                        attrtype=getname(type(v)),
                    )
                )
                raise_from(TypeError(errmsg), e)

            # If we already have a signature for this field from a parent, then
            # our new signature must be a subtype of the parent signature, so
            # that any valid call to the new signature must also be a valid
            # call to the parent signature.
            if field in signatures and not compatible(signature, signatures[field]):
                conflicted = signatures[field]
                raise InvalidSubInterface(
                    "\nInterface field {new}.{field} conflicts with inherited field of "
                    "the same name.\n"
                    "  - {field}{new_sig} != {field}{old_sig}".format(
                        new=name,
                        field=field,
                        new_sig=signature,
                        old_sig=conflicted,
                    )
                )
            else:
                signatures[field] = signature

            if isinstance(v, default):
                defaults[field] = v

        warn_if_defaults_use_non_interface_members(
            name, defaults, set(signatures.keys())
        )

        clsdict["_signatures"] = signatures
        clsdict["_defaults"] = defaults
        return super(InterfaceMeta, mcls).__new__(mcls, name, bases, clsdict)

    def _diff_signatures(self, type_):
        """
        Diff our method signatures against the methods provided by type_.

        Parameters
        ----------
        type_ : type
           The type to check.

        Returns
        -------
        missing, mistyped, mismatched : list[str], dict[str -> type], dict[str -> signature]  # noqa
            ``missing`` is a list of missing interface names.
            ``mistyped`` is a list mapping names to incorrect types.
            ``mismatched`` is a dict mapping names to incorrect signatures.
        """
        missing = []
        mistyped = {}
        mismatched = {}
        for name, iface_sig in self._signatures.items():
            try:
                # Don't invoke the descriptor protocol here so that we get
                # staticmethod/classmethod/property objects instead of the
                # functions they wrap.
                f = static_get_type_attr(type_, name)
            except AttributeError:
                missing.append(name)
                continue

            impl_sig = TypedSignature(f)

            if not issubclass(impl_sig.type, iface_sig.type):
                mistyped[name] = impl_sig.type

            if not compatible(impl_sig, iface_sig):
                mismatched[name] = impl_sig

        return missing, mistyped, mismatched

    def verify(self, type_):
        """
        Check whether a type implements ``self``.

        Parameters
        ----------
        type_ : type
            The type to check.

        Raises
        ------
        TypeError
            If ``type_`` doesn't conform to our interface.

        Returns
        -------
        None
        """
        raw_missing, mistyped, mismatched = self._diff_signatures(type_)

        # See if we have defaults for missing methods.
        missing = []
        defaults_to_use = {}
        for name in raw_missing:
            try:
                defaults_to_use[name] = self._defaults[name].implementation
            except KeyError:
                missing.append(name)

        if not any((missing, mistyped, mismatched)):
            return defaults_to_use

        raise self._invalid_implementation(type_, missing, mistyped, mismatched)

    def _invalid_implementation(self, t, missing, mistyped, mismatched):
        """
        Make a TypeError explaining why ``t`` doesn't implement our interface.
        """
        assert missing or mistyped or mismatched, "Implementation wasn't invalid."

        message = "\nclass {C} failed to implement interface {I}:".format(
            C=getname(t),
            I=getname(self),
        )
        if missing:
            message += dedent(
                """

                The following methods of {I} were not implemented:
                {missing_methods}"""
            ).format(
                I=getname(self), missing_methods=self._format_missing_methods(missing)
            )

        if mistyped:
            message += dedent(
                """

                The following methods of {I} were implemented with incorrect types:
                {mismatched_types}"""
            ).format(
                I=getname(self),
                mismatched_types=self._format_mismatched_types(mistyped),
            )

        if mismatched:
            message += dedent(
                """

                The following methods of {I} were implemented with invalid signatures:
                {mismatched_methods}"""
            ).format(
                I=getname(self),
                mismatched_methods=self._format_mismatched_methods(mismatched),
            )
        return InvalidImplementation(message)

    def _format_missing_methods(self, missing):
        return "\n".join(
            sorted(
                [
                    "  - {name}{sig}".format(name=name, sig=self._signatures[name])
                    for name in missing
                ]
            )
        )

    def _format_mismatched_types(self, mistyped):
        return "\n".join(
            sorted(
                [
                    "  - {name}: {actual!r} is not a subtype "
                    "of expected type {expected!r}".format(
                        name=name,
                        actual=getname(bad_type),
                        expected=getname(self._signatures[name].type),
                    )
                    for name, bad_type in mistyped.items()
                ]
            )
        )

    def _format_mismatched_methods(self, mismatched):
        return "\n".join(
            sorted(
                [
                    "  - {name}{actual} != {name}{expected}".format(
                        name=name,
                        actual=bad_sig,
                        expected=self._signatures[name],
                    )
                    for name, bad_sig in mismatched.items()
                ]
            )
        )


empty_set = frozenset([])


[docs]class Interface(with_metaclass(InterfaceMeta)): """ Base class for interface definitions. An interface defines a set of methods and/or attributes that should be provided by one or more classes, which are called "implementations" of the interface. Classes can declare that they implement an interface ``I`` by subclassing ``implements(I)``. During class construction, the base class generated by ``implements(I)`` will verify that the new class correctly implements all the methods required by ``I``. Interfaces cannot be instantiated. Examples -------- **Defining an Interface:** .. code-block:: python class KeyValueStore(Interface): def get(self, key): pass def set(self, key, value): pass def delete(self, key): pass **Implementing an Interface:** .. code-block:: python class InMemoryKeyValueStore(implements(KeyValueStore)): def __init__(self): self.data = {} def get(self, key): return self.data[key] def set(self, key, value): self.data[key] = value def delete(self, key): del self.data[key] See Also -------- :func:`implements` """ # Don't consider these members part of the interface definition for # children of `Interface`. _INTERFACE_IGNORE_MEMBERS = {"__new__", "from_class"} def __new__(cls, *args, **kwargs): raise TypeError("Can't instantiate interface %s" % getname(cls))
[docs] @classmethod def from_class(cls, existing_class, subset=None, name=None): """Create an interface from an existing class. Parameters ---------- existing_class : type The type from which to extract an interface. subset : list[str], optional List of methods that should be included in the interface. Default is to use all attributes not defined in an empty class. name : str, optional Name of the generated interface. Default is ``existing_class.__name__ + 'Interface'``. Returns ------- interface : type A new interface class with stubs generated from ``existing_class``. """ if name is None: name = existing_class.__name__ + "Interface" if subset is None: subset = set(dir(existing_class)) - TRIVIAL_CLASS_ATTRIBUTES return InterfaceMeta( name, (Interface,), {name: static_get_type_attr(existing_class, name) for name in subset}, )
# Signature requirements are inherited, so make sure the base interface doesn't # require any methods of children. assert Interface._signatures == {} assert Interface._defaults == {} class ImplementsMeta(type): """ Metaclass for implementations of particular interfaces. """ def __new__(mcls, name, bases, clsdict, interfaces=empty_set): assert isinstance(interfaces, frozenset) newtype = super(ImplementsMeta, mcls).__new__(mcls, name, bases, clsdict) if interfaces: # Don't do checks on the types returned by ``implements``. return newtype errors = [] default_impls = {} default_providers = defaultdict(list) for iface in sorted(newtype.interfaces(), key=getname): try: defaults_from_iface = iface.verify(newtype) for name, impl in defaults_from_iface.items(): default_impls[name] = impl default_providers[name].append(iface) except InvalidImplementation as e: errors.append(e) # The list of providers for `name`, if there's more than one. duplicate_defaults = valfilter( lambda ifaces: len(ifaces) > 1, default_providers ) if duplicate_defaults: errors.append(_conflicting_defaults(newtype.__name__, duplicate_defaults)) else: for name, impl in default_impls.items(): setattr(newtype, name, impl) if not errors: return newtype elif len(errors) == 1: raise errors[0] else: raise InvalidImplementation("\n".join(map(str, errors))) def __init__(mcls, name, bases, clsdict, interfaces=empty_set): mcls._interfaces = interfaces super(ImplementsMeta, mcls).__init__(name, bases, clsdict) def interfaces(self): for elem in unique(self._interfaces_with_duplicates()): yield elem def _interfaces_with_duplicates(self): for elem in self._interfaces: yield elem for t in filter(is_a(ImplementsMeta), self.mro()): for elem in t._interfaces: yield elem def format_iface_method_docs(I): iface_name = getname(I) return "\n".join( [ "{iface_name}.{method_name}{sig}".format( iface_name=iface_name, method_name=method_name, sig=sig, ) for method_name, sig in sorted(list(I._signatures.items()), key=first) ] ) def _make_implements(): _memo = WeakKeyDictionary() def implements(*interfaces): """ Make a base for a class that implements one or more interfaces. Parameters ---------- *interfaces : tuple One or more subclasses of :class:`~interface.Interface`. Returns ------- base : type A type validating that subclasses must implement all interface methods of types in ``interfaces``. Examples -------- **Implementing an Interface:** .. code-block:: python class MyInterface(Interface): def method1(self, x): pass def method2(self, y): pass class MyImplementation(implements(MyInterface)): def method1(self, x): return x + 1 def method2(self, y): return y * 2 **Implementing Multiple Interfaces:** .. code-block:: python class I1(Interface): def method1(self, x): pass class I2(Interface): def method2(self, y): pass class ImplBoth(implements(I1, I2)): def method1(self, x): return x + 1 def method2(self, y): return y * 2 """ if not interfaces: raise TypeError("implements() requires at least one interface") interfaces = frozenset(interfaces) try: return _memo[interfaces] except KeyError: pass for I in interfaces: if not issubclass(I, Interface): raise TypeError("implements() expected an Interface, but got %s." % I) ordered_ifaces = tuple(sorted(interfaces, key=getname)) iface_names = list(map(getname, ordered_ifaces)) name = "Implements{}".format("_".join(iface_names)) doc = dedent( """\ Implementation of {interfaces}. Methods ------- {methods}""" ).format( interfaces=", ".join(iface_names), methods="\n".join(map(format_iface_method_docs, ordered_ifaces)), ) result = ImplementsMeta( name, (object,), {"__doc__": doc}, interfaces=interfaces, ) # NOTE: It's important for correct weak-memoization that this is set is # stored somewhere on the resulting type. assert result._interfaces is interfaces, "Interfaces not stored." _memo[interfaces] = result return result return implements implements = _make_implements() del _make_implements