diff --git a/linkml_runtime/dumpers/rdflib_dumper.py b/linkml_runtime/dumpers/rdflib_dumper.py index ac8e3df0..56587b21 100644 --- a/linkml_runtime/dumpers/rdflib_dumper.py +++ b/linkml_runtime/dumpers/rdflib_dumper.py @@ -10,8 +10,8 @@ from linkml_runtime.dumpers.dumper_root import Dumper -from linkml_runtime.linkml_model import SlotDefinition -from linkml_runtime.utils.schemaview import SchemaView, ElementName, PermissibleValue, PermissibleValueText +from linkml_runtime.linkml_model import ElementName, PermissibleValue, PermissibleValueText, SlotDefinition +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.utils.yamlutils import YAMLRoot logger = logging.getLogger(__name__) diff --git a/linkml_runtime/index/object_index.py b/linkml_runtime/index/object_index.py index a473a1c2..d780b381 100644 --- a/linkml_runtime/index/object_index.py +++ b/linkml_runtime/index/object_index.py @@ -13,7 +13,7 @@ from typing import Any, Union from collections.abc import Mapping, Iterator -from linkml_runtime import SchemaView +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.utils import eval_utils from linkml_runtime.utils.yamlutils import YAMLRoot diff --git a/linkml_runtime/loaders/rdflib_loader.py b/linkml_runtime/loaders/rdflib_loader.py index 758d13e9..a7497e7f 100644 --- a/linkml_runtime/loaders/rdflib_loader.py +++ b/linkml_runtime/loaders/rdflib_loader.py @@ -10,11 +10,17 @@ from rdflib.term import BNode, Literal from rdflib.namespace import RDF -from linkml_runtime import MappingError, DataNotFoundError -from linkml_runtime.linkml_model import ClassDefinitionName, TypeDefinition, EnumDefinition, ClassDefinition +from linkml_runtime import DataNotFoundError, MappingError +from linkml_runtime.linkml_model import ( + ClassDefinition, + ClassDefinitionName, + EnumDefinition, + SlotDefinition, + TypeDefinition, +) from linkml_runtime.loaders.loader_root import Loader from linkml_runtime.utils.formatutils import underscore -from linkml_runtime.utils.schemaview import SchemaView, SlotDefinition +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.utils.yamlutils import YAMLRoot from pydantic import BaseModel diff --git a/linkml_runtime/processing/referencevalidator.py b/linkml_runtime/processing/referencevalidator.py index a167f634..f88b6191 100644 --- a/linkml_runtime/processing/referencevalidator.py +++ b/linkml_runtime/processing/referencevalidator.py @@ -19,7 +19,7 @@ import click import yaml -from linkml_runtime import SchemaView +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.dumpers import yaml_dumper from linkml_runtime.linkml_model import ( SlotDefinition, diff --git a/linkml_runtime/utils/inference_utils.py b/linkml_runtime/utils/inference_utils.py index c8545d78..6a0bd3b5 100644 --- a/linkml_runtime/utils/inference_utils.py +++ b/linkml_runtime/utils/inference_utils.py @@ -4,7 +4,7 @@ from typing import Union, Optional, Any, Callable from jsonasobj2 import JsonObj, items -from linkml_runtime import SchemaView +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.linkml_model import SlotDefinitionName, PermissibleValue, ClassDefinitionName from linkml_runtime.utils.enumerations import EnumDefinitionImpl from linkml_runtime.utils.eval_utils import eval_expr diff --git a/linkml_runtime/utils/schemaview.py b/linkml_runtime/utils/schemaview.py index 1be70a30..5356e54f 100644 --- a/linkml_runtime/utils/schemaview.py +++ b/linkml_runtime/utils/schemaview.py @@ -1,37 +1,68 @@ +"""SchemaView, a virtual schema layered on top of a schema plus its import closure.""" + +from __future__ import annotations + +import collections +import logging import os import sys import uuid -import logging -import collections -from functools import lru_cache -from copy import copy, deepcopy +import warnings from collections import defaultdict, deque -from pathlib import Path, PurePath -from typing import Optional, TypeVar from collections.abc import Mapping -import warnings -from urllib.parse import urlparse +from copy import copy, deepcopy +from dataclasses import dataclass +from enum import Enum +from functools import lru_cache +from pathlib import Path, PurePath +from typing import TYPE_CHECKING, Any, TypeVar, Union -from linkml_runtime.utils.namespaces import Namespaces from deprecated.classic import deprecated -from linkml_runtime.utils.context_utils import parse_import_map, map_import + +from linkml_runtime.exceptions import OrderingError +from linkml_runtime.linkml_model.meta import ( + AnonymousSlotExpression, + ClassDefinition, + ClassDefinitionName, + Definition, + DefinitionName, + Element, + ElementName, + EnumDefinition, + EnumDefinitionName, + PermissibleValueText, + SchemaDefinition, + SchemaDefinitionName, + SlotDefinition, + SlotDefinitionName, + SubsetDefinition, + SubsetDefinitionName, + TypeDefinition, + TypeDefinitionName, +) +from linkml_runtime.utils.context_utils import map_import, parse_import_map from linkml_runtime.utils.formatutils import camelcase, is_empty, sfx, underscore +from linkml_runtime.utils.namespaces import Namespaces from linkml_runtime.utils.pattern import PatternResolver -from linkml_runtime.linkml_model.meta import * -from linkml_runtime.exceptions import OrderingError -from enum import Enum + +if TYPE_CHECKING: + from collections.abc import Mapping + from types import NotImplementedType + + from linkml_runtime.utils.metamodelcore import URIorCURIE + logger = logging.getLogger(__name__) MAPPING_TYPE = str ## e.g. broad, exact, related, ... CACHE_SIZE = 1024 -SLOTS = 'slots' -CLASSES = 'classes' -ENUMS = 'enums' -SUBSETS = 'subsets' -TYPES = 'types' -WINDOWS = sys.platform == 'win32' +SLOTS = "slots" +CLASSES = "classes" +ENUMS = "enums" +SUBSETS = "subsets" +TYPES = "types" +WINDOWS = sys.platform == "win32" CLASS_NAME = Union[ClassDefinitionName, str] SLOT_NAME = Union[SlotDefinitionName, str] @@ -48,6 +79,8 @@ class OrderedBy(Enum): + """Permissible values for ordering.""" + RANK = "rank" LEXICAL = "lexical" PRESERVE = "preserve" @@ -57,7 +90,7 @@ class OrderedBy(Enum): """ -def _closure(f, x, reflexive=True, depth_first=True, **kwargs): +def _closure(f, x, reflexive: bool = True, depth_first: bool = True, **kwargs: dict[str, Any] | None) -> list: if reflexive: rv = [x] else: @@ -80,9 +113,11 @@ def _closure(f, x, reflexive=True, depth_first=True, **kwargs): return rv -def load_schema_wrap(path: str, **kwargs): +def load_schema_wrap(path: str, **kwargs: dict[str, Any]) -> SchemaDefinition: + """Load a schema.""" # import here to avoid circular imports from linkml_runtime.loaders.yaml_loader import YAMLLoader + yaml_loader = YAMLLoader() schema: SchemaDefinition schema = yaml_loader.load(path, target_class=SchemaDefinition, **kwargs) @@ -97,6 +132,13 @@ def load_schema_wrap(path: str, **kwargs): def is_absolute_path(path: str) -> bool: + """Test whether a string represents an absolute path. + + :param path: string representing a path + :type path: str + :return: true or false + :rtype: bool + """ if path.startswith("/"): return True # windows @@ -111,20 +153,18 @@ def is_absolute_path(path: str) -> bool: @dataclass class SchemaUsage: - """ - A usage of an element of a schema - """ + """A usage of an element of a schema.""" + used_by: ElementName slot: SlotDefinitionName metaslot: SlotDefinitionName used: ElementName - inferred: bool = None + inferred: bool | None = None @dataclass class SchemaView: - """ - A SchemaView provides a virtual schema layered on top of a schema plus its import closure + """A SchemaView provides a virtual schema layered on top of a schema plus its import closure. Most operations are parameterized by `imports`. If this is set to True (default), then the full import closure is considered when answering @@ -141,20 +181,35 @@ class SchemaView: - https://github.com/linkml/linkml/issues/270 """ - schema: SchemaDefinition = None - schema_map: dict[SchemaDefinitionName, SchemaDefinition] = None - importmap: Optional[Mapping[str, str]] = None + schema: SchemaDefinition | None = None + schema_map: dict[SchemaDefinitionName, SchemaDefinition] | None = None + importmap: Mapping[str, str] | None = None """Optional mapping between schema names and local paths/URLs""" modifications: int = 0 - uuid: str = None + uuid: str | None = None ## private vars -------- # cached hash - _hash: Optional[int] = None - - - def __init__(self, schema: Union[str, Path, SchemaDefinition], - importmap: Optional[dict[str, str]] = None, merge_imports: bool = False, base_dir: str = None): + _hash: int | None = None + + def __init__( + self, + schema: str | Path | SchemaDefinition, + importmap: dict[str, str] | None = None, + merge_imports: bool = False, + base_dir: str | None = None, + ) -> None: + """Initialize a SchemaView instance. + + :param schema: schema or path to schema to be viewed + :type schema: str | Path | SchemaDefinition + :param importmap: import mapping, defaults to None + :type importmap: dict[str, str] | None, optional + :param merge_imports: whether or not to merge imports, defaults to False + :type merge_imports: bool, optional + :param base_dir: base directory for import map, defaults to None + :type base_dir: str | None, optional + """ if isinstance(schema, Path): schema = str(schema) if isinstance(schema, str): @@ -169,7 +224,14 @@ def __init__(self, schema: Union[str, Path, SchemaDefinition], def __key(self): return self.schema.id, self.uuid, self.modifications - def __eq__(self, other): + def __eq__(self, other: object) -> bool | NotImplementedType: + """Test whether another object is equal to this one. + + :param other: other object + :type other: object + :return: true or false if the object is a schemaview; otherwise, raise an error + :rtype: bool | type[NotImplemented] + """ if isinstance(other, SchemaView): return self.__key() == other.__key() return NotImplemented @@ -181,6 +243,11 @@ def __hash__(self) -> int: @lru_cache(None) def namespaces(self) -> Namespaces: + """Return the namespaces present in a schema. + + :return: namespaces + :rtype: Namespaces + """ namespaces = Namespaces() for s in self.schema_map.values(): for cmap in self.schema.default_curi_maps: @@ -189,9 +256,8 @@ def namespaces(self) -> Namespaces: namespaces[prefix.prefix_prefix] = prefix.prefix_reference return namespaces - def load_import(self, imp: str, from_schema: SchemaDefinition = None): - """ - Handles import directives. + def load_import(self, imp: str, from_schema: SchemaDefinition | None = None) -> SchemaDefinition: + """Handle import directives. The value of the import can be: @@ -218,24 +284,23 @@ def load_import(self, imp: str, from_schema: SchemaDefinition = None): if from_schema is None: from_schema = self.schema from linkml_runtime import SCHEMA_DIRECTORY - default_import_map = { - "linkml:": str(SCHEMA_DIRECTORY) - } + + default_import_map = {"linkml:": str(SCHEMA_DIRECTORY)} importmap = {**default_import_map, **self.importmap} sname = map_import(importmap, self.namespaces, imp) if from_schema.source_file and not is_absolute_path(sname): base_dir = os.path.dirname(from_schema.source_file) else: base_dir = None - logger.info(f'Importing {imp} as {sname} from source {from_schema.source_file}; base_dir={base_dir}') - schema = load_schema_wrap(sname + '.yaml', base_dir=base_dir) + logger.info(f"Importing {imp} as {sname} from source {from_schema.source_file}; base_dir={base_dir}") + schema = load_schema_wrap(sname + ".yaml", base_dir=base_dir) return schema @lru_cache(None) - def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, inject_metadata=True) -> list[ - SchemaDefinitionName]: - """ - Return all imports + def imports_closure( + self, imports: bool = True, traverse: bool | None = None, inject_metadata: bool = True + ) -> list[SchemaDefinitionName]: + """Return all imports. Objects in imported classes override one another in a "python-like" order - from the point of view of the importing schema, imports will override one @@ -271,8 +336,8 @@ def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, if traverse is not None: warnings.warn( - 'traverse behaves identically to imports and will be removed in a future version. Use imports instead.', - DeprecationWarning + "traverse behaves identically to imports and will be removed in a future version. Use imports instead.", + DeprecationWarning, ) if not imports or (not traverse and traverse is not None): @@ -313,9 +378,7 @@ def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, if "/" in sn and ":" not in i: if WINDOWS: # This cannot be simplified. os.path.normpath() must be called before .as_posix() - i = PurePath( - os.path.normpath(PurePath(sn).parent / i) - ).as_posix() + i = PurePath(os.path.normpath(PurePath(sn).parent / i)).as_posix() else: i = os.path.normpath(str(Path(sn).parent / i)) todo.append(i) @@ -327,7 +390,7 @@ def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, visited.add(sn) # filter duplicates, keeping first entry - closure = list({k: None for k in closure}.keys()) + closure = list(dict.fromkeys(closure).keys()) if inject_metadata: for s in self.schema_map.values(): @@ -349,7 +412,8 @@ def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, @lru_cache(None) def all_schema(self, imports: bool = True) -> list[SchemaDefinition]: - """ + """Return all schemas. + :param imports: include imports closure :return: all schemas """ @@ -358,32 +422,30 @@ def all_schema(self, imports: bool = True) -> list[SchemaDefinition]: @deprecated("Use `all_classes` instead") @lru_cache(None) - def all_class(self, imports=True) -> dict[ClassDefinitionName, ClassDefinition]: - """ + def all_class(self, imports: bool = True) -> dict[ClassDefinitionName, ClassDefinition]: + """Return all classes. + :param imports: include imports closure :return: all classes in schema view """ return self._get_dict(CLASSES, imports) - def ordered(self, elements: ElementDict, ordered_by: Optional[OrderedBy] = None) -> ElementDict: - """ - Order a dictionary of elements with some ordering method in :class:`.OrderedBy` - """ + def ordered(self, elements: ElementDict, ordered_by: OrderedBy | None = None) -> ElementDict: + """Order a dictionary of elements with some ordering method in :class:`.OrderedBy`.""" if ordered_by in (OrderedBy.LEXICAL, OrderedBy.LEXICAL.value): return self._order_lexically(elements) - elif ordered_by in (OrderedBy.RANK, OrderedBy.RANK.value): + if ordered_by in (OrderedBy.RANK, OrderedBy.RANK.value): return self._order_rank(elements) - elif ordered_by in (OrderedBy.INHERITANCE, OrderedBy.INHERITANCE.value): + if ordered_by in (OrderedBy.INHERITANCE, OrderedBy.INHERITANCE.value): return self._order_inheritance(elements) - elif ordered_by is None or ordered_by in (OrderedBy.PRESERVE, OrderedBy.PRESERVE.value): + if ordered_by is None or ordered_by in (OrderedBy.PRESERVE, OrderedBy.PRESERVE.value): return elements - else: - raise ValueError(f"ordered_by must be in OrderedBy or None, got {ordered_by}") + raise ValueError(f"ordered_by must be in OrderedBy or None, got {ordered_by}") def _order_lexically(self, elements: ElementDict) -> ElementDict: - """ + """Order elements by name. + :param element: slots or class type to order - :param imports :return: all classes or slots sorted lexically in schema view """ ordered_list_of_names = [] @@ -396,11 +458,11 @@ def _order_lexically(self, elements: ElementDict) -> ElementDict: return ordered_elements def _order_rank(self, elements: ElementDict) -> ElementDict: - """ + """Order elements by rank. + :param elements: slots or classes to order :return: all classes or slots sorted by their rank in schema view """ - rank_map = {} unranked_map = {} rank_ordered_elements = {} @@ -418,9 +480,7 @@ def _order_rank(self, elements: ElementDict) -> ElementDict: return rank_ordered_elements def _order_inheritance(self, elements: DefDict) -> DefDict: - """ - sort classes such that if C is a child of P then C appears after P in the list - """ + """Sort classes such that if C is a child of P then C appears after P in the list.""" clist = list(elements.values()) slist = [] # sorted can_add = False @@ -428,11 +488,8 @@ def _order_inheritance(self, elements: DefDict) -> DefDict: for i in range(len(clist)): candidate = clist[i] can_add = False - if candidate.is_a is None: + if candidate.is_a is None or candidate.is_a in [p.name for p in slist]: can_add = True - else: - if candidate.is_a in [p.name for p in slist]: - can_add = True if can_add: slist = slist + [candidate] del clist[i] @@ -443,35 +500,39 @@ def _order_inheritance(self, elements: DefDict) -> DefDict: return {s.name: s for s in slist} @lru_cache(None) - def all_classes(self, ordered_by=OrderedBy.PRESERVE, imports=True) -> dict[ClassDefinitionName, ClassDefinition]: - """ + def all_classes( + self, ordered_by: OrderedBy = OrderedBy.PRESERVE, imports: bool = True + ) -> dict[ClassDefinitionName, ClassDefinition]: + """Retrieve all classes from a schema. + :param ordered_by: an enumerated parameter that returns all the classes in the order specified. :param imports: include imports closure :return: all classes in schema view """ classes = copy(self._get_dict(CLASSES, imports)) - classes = self.ordered(classes, ordered_by=ordered_by) - return classes + return self.ordered(classes, ordered_by=ordered_by) @deprecated("Use `all_slots` instead") @lru_cache(None) - def all_slot(self, **kwargs) -> dict[SlotDefinitionName, SlotDefinition]: - """ + def all_slot(self, **kwargs: dict[str, Any]) -> dict[SlotDefinitionName, SlotDefinition]: + """Retrieve all slots from the schema. + :param imports: include imports closure :return: all slots in schema view """ return self.all_slots(**kwargs) @lru_cache(None) - def all_slots(self, ordered_by=OrderedBy.PRESERVE, imports=True, attributes=True) -> dict[ - SlotDefinitionName, SlotDefinition]: - """ + def all_slots( + self, ordered_by: OrderedBy = OrderedBy.PRESERVE, imports: bool = True, attributes: bool = True + ) -> dict[SlotDefinitionName, SlotDefinition]: + """Retrieve all slots from the schema. + :param ordered_by: an enumerated parameter that returns all the slots in the order specified. :param imports: include imports closure :param attributes: include attributes as slots or not, default is to include. :return: all slots in schema view """ - slots = copy(self._get_dict(SLOTS, imports)) if attributes: for c in self.all_classes().values(): @@ -479,21 +540,22 @@ def all_slots(self, ordered_by=OrderedBy.PRESERVE, imports=True, attributes=True if aname not in slots: slots[aname] = a - slots = self.ordered(slots, ordered_by=ordered_by) - return slots + return self.ordered(slots, ordered_by=ordered_by) @deprecated("Use `all_enums` instead") @lru_cache(None) - def all_enum(self, imports=True) -> dict[EnumDefinitionName, EnumDefinition]: - """ + def all_enum(self, imports: bool = True) -> dict[EnumDefinitionName, EnumDefinition]: + """Retrieve all enums from the schema. + :param imports: include imports closure :return: all enums in schema view """ - return self._get_dict(ENUMS, imports) + return self.all_enums(imports) @lru_cache(None) - def all_enums(self, imports=True) -> dict[EnumDefinitionName, EnumDefinition]: - """ + def all_enums(self, imports: bool = True) -> dict[EnumDefinitionName, EnumDefinition]: + """Retrieve all enums from the schema. + :param imports: include imports closure :return: all enums in schema view """ @@ -501,32 +563,36 @@ def all_enums(self, imports=True) -> dict[EnumDefinitionName, EnumDefinition]: @deprecated("Use `all_types` instead") @lru_cache(None) - def all_type(self, imports=True) -> dict[TypeDefinitionName, TypeDefinition]: - """ + def all_type(self, imports: bool = True) -> dict[TypeDefinitionName, TypeDefinition]: + """Retrieve all types from the schema. + :param imports: include imports closure :return: all types in schema view """ - return self._get_dict(TYPES, imports) + return self.all_types(imports) @lru_cache(None) - def all_types(self, imports=True) -> dict[TypeDefinitionName, TypeDefinition]: - """ + def all_types(self, imports: bool = True) -> dict[TypeDefinitionName, TypeDefinition]: + """Retrieve all types from the schema. + :param imports: include imports closure :return: all types in schema view """ return self._get_dict(TYPES, imports) @deprecated("Use `all_subsets` instead") - def all_subset(self, imports=True) -> dict[SubsetDefinitionName, SubsetDefinition]: - """ + def all_subset(self, imports: bool = True) -> dict[SubsetDefinitionName, SubsetDefinition]: + """Retrieve all subsets from the schema. + :param imports: include imports closure :return: all subsets in schema view """ - return self._get_dict(SUBSETS, imports) + return self.all_subsets(imports) @lru_cache(None) - def all_subsets(self, imports=True) -> dict[SubsetDefinitionName, SubsetDefinition]: - """ + def all_subsets(self, imports: bool = True) -> dict[SubsetDefinitionName, SubsetDefinition]: + """Retrieve all subsets from the schema. + :param imports: include imports closure :return: all subsets in schema view """ @@ -534,22 +600,18 @@ def all_subsets(self, imports=True) -> dict[SubsetDefinitionName, SubsetDefiniti @deprecated("Use `all_elements` instead") @lru_cache(None) - def all_element(self, imports=True) -> dict[ElementName, Element]: - """ + def all_element(self, imports: bool = True) -> dict[ElementName, Element]: + """Retrieve all elements from the schema. + :param imports: include imports closure :return: all elements in schema view """ - all_classes = self.all_classes(imports=imports) - all_slots = self.all_slots(imports=imports) - all_enums = self.all_enums(imports=imports) - all_types = self.all_types(imports=imports) - all_subsets = self.all_subsets(imports=imports) - # {**a,**b} syntax merges dictionary a and b into a single dictionary, removing duplicates. - return {**all_classes, **all_slots, **all_enums, **all_types, **all_subsets} + return self.all_elements(imports) @lru_cache(None) - def all_elements(self, imports=True) -> dict[ElementName, Element]: - """ + def all_elements(self, imports: bool = True) -> dict[ElementName, Element]: + """Retrieve all elements from the schema. + :param imports: include imports closure :return: all elements in schema view """ @@ -561,7 +623,7 @@ def all_elements(self, imports=True) -> dict[ElementName, Element]: # {**a,**b} syntax merges dictionary a and b into a single dictionary, removing duplicates. return {**all_classes, **all_slots, **all_enums, **all_types, **all_subsets} - def _get_dict(self, slot_name: str, imports=True) -> dict: + def _get_dict(self, slot_name: str, imports: bool = True) -> dict: schemas = self.all_schema(imports) d = {} # pdb.set_trace() @@ -576,8 +638,7 @@ def _get_dict(self, slot_name: str, imports=True) -> dict: @lru_cache(None) def slot_name_mappings(self) -> dict[str, SlotDefinition]: - """ - Mapping between processed safe slot names (following naming conventions) and slots. + """Return a mapping between processed safe slot names (following naming conventions) and slots. For example, a slot may have name 'lives at', the code-safe version is `lives_at` @@ -590,8 +651,7 @@ def slot_name_mappings(self) -> dict[str, SlotDefinition]: @lru_cache(None) def class_name_mappings(self) -> dict[str, ClassDefinition]: - """ - Mapping between processed safe class names (following naming conventions) and classes. + """Return a mapping between processed safe class names (following naming conventions) and classes. For example, a class may have name 'named thing', the code-safe version is `NamedThing` @@ -604,13 +664,14 @@ def class_name_mappings(self) -> dict[str, ClassDefinition]: @lru_cache(None) def in_schema(self, element_name: ElementName) -> SchemaDefinitionName: - """ + """Retrieve the name of the schema in which an element is defined. + :param element_name: :return: name of schema in which element is defined """ ix = self.element_by_schema_map() if element_name not in ix: - raise ValueError(f'Element {element_name} not in any schema') + raise ValueError(f"Element {element_name} not in any schema") return ix[element_name] @lru_cache(None) @@ -627,8 +688,9 @@ def element_by_schema_map(self) -> dict[ElementName, SchemaDefinitionName]: return ix @lru_cache(None) - def get_class(self, class_name: CLASS_NAME, imports=True, strict=False) -> ClassDefinition: - """ + def get_class(self, class_name: CLASS_NAME, imports: bool = True, strict: bool = False) -> ClassDefinition: + """Retrieve a class from the schema. + :param class_name: name of the class to be retrieved :param imports: include import closure :return: class definition @@ -636,12 +698,14 @@ def get_class(self, class_name: CLASS_NAME, imports=True, strict=False) -> Class c = self.all_classes(imports=imports).get(class_name, None) if strict and c is None: raise ValueError(f'No such class as "{class_name}"') - else: - return c + return c @lru_cache(None) - def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=False) -> SlotDefinition: - """ + def get_slot( + self, slot_name: SLOT_NAME, imports: bool = True, attributes: bool = True, strict: bool = False + ) -> SlotDefinition: + """Retrieve a slot from the schema. + :param slot_name: name of the slot to be retrieved :param imports: include import closure :param attributes: include attributes @@ -663,8 +727,9 @@ def get_slot(self, slot_name: SLOT_NAME, imports=True, attributes=True, strict=F return slot @lru_cache(None) - def get_subset(self, subset_name: SUBSET_NAME, imports=True, strict=False) -> SubsetDefinition: - """ + def get_subset(self, subset_name: SUBSET_NAME, imports: bool = True, strict: bool = False) -> SubsetDefinition: + """Retrieve a subset from the schema. + :param subset_name: name of the subsey to be retrieved :param imports: include import closure :return: subset definition @@ -672,12 +737,12 @@ def get_subset(self, subset_name: SUBSET_NAME, imports=True, strict=False) -> Su s = self.all_subsets(imports).get(subset_name, None) if strict and s is None: raise ValueError(f'No such subset as "{subset_name}"') - else: - return s + return s @lru_cache(None) - def get_enum(self, enum_name: ENUM_NAME, imports=True, strict=False) -> EnumDefinition: - """ + def get_enum(self, enum_name: ENUM_NAME, imports: bool = True, strict: bool = False) -> EnumDefinition: + """Retrieve an enum from the schema. + :param enum_name: name of the enum to be retrieved :param imports: include import closure :return: enum definition @@ -685,12 +750,12 @@ def get_enum(self, enum_name: ENUM_NAME, imports=True, strict=False) -> EnumDefi e = self.all_enums(imports).get(enum_name, None) if strict and e is None: raise ValueError(f'No such subset as "{enum_name}"') - else: - return e + return e @lru_cache(None) - def get_type(self, type_name: TYPE_NAME, imports=True, strict=False) -> TypeDefinition: - """ + def get_type(self, type_name: TYPE_NAME, imports: bool = True, strict: bool = False) -> TypeDefinition: + """Retrieve a type from the schema. + :param type_name: name of the type to be retrieved :param imports: include import closure :return: type definition @@ -698,22 +763,19 @@ def get_type(self, type_name: TYPE_NAME, imports=True, strict=False) -> TypeDefi t = self.all_types(imports).get(type_name, None) if strict and t is None: raise ValueError(f'No such subset as "{type_name}"') - else: - return t + return t - def _parents(self, e: Element, imports=True, mixins=True, is_a=True) -> list[ElementName]: - if mixins: - parents = copy(e.mixins) - else: - parents = [] + def _parents(self, e: Element, imports: bool = True, mixins: bool = True, is_a: bool = True) -> list[ElementName]: + parents = copy(e.mixins) if mixins else [] if e.is_a is not None and is_a: parents.append(e.is_a) return parents @lru_cache(None) - def class_parents(self, class_name: CLASS_NAME, imports=True, mixins=True, is_a=True) -> list[ClassDefinitionName]: - """ - :param class_name: child class name + def class_parents( + self, class_name: CLASS_NAME, imports: bool = True, mixins: bool = True, is_a: bool = True + ) -> list[ClassDefinitionName]: + """:param class_name: child class name :param imports: include import closure :param mixins: include mixins (default is True) :return: all direct parent class names (is_a and mixins) @@ -722,9 +784,10 @@ def class_parents(self, class_name: CLASS_NAME, imports=True, mixins=True, is_a= return self._parents(cls, imports, mixins, is_a) @lru_cache(None) - def enum_parents(self, enum_name: ENUM_NAME, imports=False, mixins=False, is_a=True) -> list[EnumDefinitionName]: - """ - :param enum_name: child enum name + def enum_parents( + self, enum_name: ENUM_NAME, imports: bool = False, mixins: bool = False, is_a: bool = True + ) -> list[EnumDefinitionName]: + """:param enum_name: child enum name :param imports: include import closure (False) :param mixins: include mixins (default is False) :return: all direct parent enum names (is_a and mixins) @@ -733,10 +796,10 @@ def enum_parents(self, enum_name: ENUM_NAME, imports=False, mixins=False, is_a=T return self._parents(e, imports, mixins, is_a=is_a) @lru_cache(None) - def permissible_value_parent(self, permissible_value: str, enum_name: ENUM_NAME) -> Union[ - str, PermissibleValueText, None, ValueError]: - """ - :param enum_name: child enum name + def permissible_value_parent( + self, permissible_value: str, enum_name: ENUM_NAME + ) -> str | PermissibleValueText | None | ValueError: + """:param enum_name: child enum name :param permissible_value: permissible value :return: all direct parent enum names (is_a) """ @@ -750,10 +813,10 @@ def permissible_value_parent(self, permissible_value: str, enum_name: ENUM_NAME) return [] @lru_cache(None) - def permissible_value_children(self, permissible_value: str, enum_name: ENUM_NAME) -> Union[ - str, PermissibleValueText, None, ValueError]: - """ - :param enum_name: parent enum name + def permissible_value_children( + self, permissible_value: str, enum_name: ENUM_NAME + ) -> str | PermissibleValueText | None | ValueError: + """:param enum_name: parent enum name :param permissible_value: permissible value :return: all direct child permissible values (is_a) @@ -769,7 +832,6 @@ def permissible_value_children(self, permissible_value: str, enum_name: ENUM_NAM is_a: BIRD """ - enum = self.get_enum(enum_name, strict=True) children = [] if enum: @@ -784,8 +846,11 @@ def permissible_value_children(self, permissible_value: str, enum_name: ENUM_NAM raise ValueError(f'No such enum as "{enum_name}"') @lru_cache(None) - def slot_parents(self, slot_name: SLOT_NAME, imports=True, mixins=True, is_a=True) -> list[SlotDefinitionName]: - """ + def slot_parents( + self, slot_name: SLOT_NAME, imports: bool = True, mixins: bool = True, is_a: bool = True + ) -> list[SlotDefinitionName]: + """Return the parent of a slot, if it exists. + :param slot_name: child slot name :param imports: include import closure :param mixins: include mixins (default is True) @@ -794,12 +859,12 @@ def slot_parents(self, slot_name: SLOT_NAME, imports=True, mixins=True, is_a=Tru s = self.get_slot(slot_name, imports, strict=True) if s: return self._parents(s, imports, mixins, is_a) - else: - return [] + return [] @lru_cache(None) - def type_parents(self, type_name: TYPE_NAME, imports=True) -> list[TypeDefinitionName]: - """ + def type_parents(self, type_name: TYPE_NAME, imports: bool = True) -> list[TypeDefinitionName]: + """Return the parent of a type, if it exists. + :param type_name: child type name :param imports: include import closure :return: all direct parent enum names (is_a and mixins) @@ -807,13 +872,12 @@ def type_parents(self, type_name: TYPE_NAME, imports=True) -> list[TypeDefinitio typ = self.get_type(type_name, imports, strict=True) if typ.typeof: return [typ.typeof] - else: - return [] + return [] @lru_cache(None) def get_children(self, name: str, mixin: bool = True) -> list[str]: - """ - get the children of an element (any class, slot, enum, type) + """Get the children of an element (any class, slot, enum, type). + :param name: name of the parent element :param mixin: include mixins :return: list of child element @@ -828,8 +892,11 @@ def get_children(self, name: str, mixin: bool = True) -> list[str]: return children @lru_cache(None) - def class_children(self, class_name: CLASS_NAME, imports=True, mixins=True, is_a=True) -> list[ClassDefinitionName]: - """ + def class_children( + self, class_name: CLASS_NAME, imports: bool = True, mixins: bool = True, is_a: bool = True + ) -> list[ClassDefinitionName]: + """Return class children. + :param class_name: parent class name :param imports: include import closure :param mixins: include mixins (default is True) @@ -840,8 +907,11 @@ def class_children(self, class_name: CLASS_NAME, imports=True, mixins=True, is_a return [x.name for x in elts if (x.is_a == class_name and is_a) or (mixins and class_name in x.mixins)] @lru_cache(None) - def slot_children(self, slot_name: SLOT_NAME, imports=True, mixins=True, is_a=True) -> list[SlotDefinitionName]: - """ + def slot_children( + self, slot_name: SLOT_NAME, imports: bool = True, mixins: bool = True, is_a: bool = True + ) -> list[SlotDefinitionName]: + """Return slot children. + :param slot_name: parent slot name :param imports: include import closure :param mixins: include mixins (default is True) @@ -852,10 +922,16 @@ def slot_children(self, slot_name: SLOT_NAME, imports=True, mixins=True, is_a=Tr return [x.name for x in elts if (x.is_a == slot_name and is_a) or (mixins and slot_name in x.mixins)] @lru_cache(None) - def class_ancestors(self, class_name: CLASS_NAME, imports=True, mixins=True, reflexive=True, is_a=True, - depth_first=True) -> list[ClassDefinitionName]: - """ - Closure of class_parents method + def class_ancestors( + self, + class_name: CLASS_NAME, + imports: bool = True, + mixins: bool = True, + reflexive: bool = True, + is_a: bool = True, + depth_first: bool = True, + ) -> list[ClassDefinitionName]: + """Return the closure of class_parents method. :param class_name: query class :param imports: include import closure @@ -865,45 +941,63 @@ def class_ancestors(self, class_name: CLASS_NAME, imports=True, mixins=True, ref :param depth_first: :return: ancestor class names """ - return _closure(lambda x: self.class_parents(x, imports=imports, mixins=mixins, is_a=is_a), - class_name, - reflexive=reflexive, depth_first=depth_first) + return _closure( + lambda x: self.class_parents(x, imports=imports, mixins=mixins, is_a=is_a), + class_name, + reflexive=reflexive, + depth_first=depth_first, + ) @lru_cache(None) - def permissible_value_ancestors(self, permissible_value_text: str, - enum_name: ENUM_NAME, - reflexive=True, - depth_first=True) -> list[str]: - """ - Closure of permissible_value_parents method - :enum - """ + def permissible_value_ancestors( + self, permissible_value_text: str, enum_name: ENUM_NAME, reflexive: bool = True, depth_first: bool = True + ) -> list[str]: + """Return the closure of permissible_value_parents method. - return _closure(lambda x: self.permissible_value_parent(x, enum_name), - permissible_value_text, - reflexive=reflexive, - depth_first=depth_first) + :param permissible_value_text: + :type permissible_value_text: str + :param enum_name: + :type enum_name: ENUM_NAME + :param reflexive: ..., defaults to True + :type reflexive: bool, optional + :param depth_first: ..., defaults to True + :type depth_first: bool, optional + :return: closure of permissible_value_parents + :rtype: list[str] + """ + return _closure( + lambda x: self.permissible_value_parent(x, enum_name), + permissible_value_text, + reflexive=reflexive, + depth_first=depth_first, + ) + + @lru_cache(None) + def permissible_value_descendants( + self, permissible_value_text: str, enum_name: ENUM_NAME, reflexive: bool = True, depth_first: bool = True + ) -> list[str]: + """Return the closure of permissible_value_children method. - @lru_cache(None) - def permissible_value_descendants(self, permissible_value_text: str, - enum_name: ENUM_NAME, - reflexive=True, - depth_first=True) -> list[str]: - """ - Closure of permissible_value_children method :enum """ - - return _closure(lambda x: self.permissible_value_children(x, enum_name), - permissible_value_text, - reflexive=reflexive, - depth_first=depth_first) - - @lru_cache(None) - def enum_ancestors(self, enum_name: ENUM_NAME, imports=True, mixins=True, reflexive=True, is_a=True, - depth_first=True) -> list[EnumDefinitionName]: - """ - Closure of enum_parents method + return _closure( + lambda x: self.permissible_value_children(x, enum_name), + permissible_value_text, + reflexive=reflexive, + depth_first=depth_first, + ) + + @lru_cache(None) + def enum_ancestors( + self, + enum_name: ENUM_NAME, + imports: bool = True, + mixins: bool = True, + reflexive: bool = True, + is_a: bool = True, + depth_first: bool = True, + ) -> list[EnumDefinitionName]: + """Return the closure of enum_parents method. :param enum_name: query enum :param imports: include import closure @@ -913,15 +1007,18 @@ def enum_ancestors(self, enum_name: ENUM_NAME, imports=True, mixins=True, reflex :param depth_first: :return: ancestor enum names """ - return _closure(lambda x: self.enum_parents(x, imports=imports, mixins=mixins, is_a=is_a), - enum_name, - reflexive=reflexive, depth_first=depth_first) + return _closure( + lambda x: self.enum_parents(x, imports=imports, mixins=mixins, is_a=is_a), + enum_name, + reflexive=reflexive, + depth_first=depth_first, + ) @lru_cache(None) - def type_ancestors(self, type_name: TYPES, imports=True, reflexive=True, depth_first=True) -> list[ - TypeDefinitionName]: - """ - All ancestors of a type via typeof + def type_ancestors( + self, type_name: TYPES, imports: bool = True, reflexive: bool = True, depth_first: bool = True + ) -> list[TypeDefinitionName]: + """Return all ancestors of a type via typeof. :param type_name: query type :param imports: include import closure @@ -929,15 +1026,15 @@ def type_ancestors(self, type_name: TYPES, imports=True, reflexive=True, depth_f :param depth_first: :return: ancestor class names """ - return _closure(lambda x: self.type_parents(x, imports=imports), - type_name, - reflexive=reflexive, depth_first=depth_first) + return _closure( + lambda x: self.type_parents(x, imports=imports), type_name, reflexive=reflexive, depth_first=depth_first + ) @lru_cache(None) - def slot_ancestors(self, slot_name: SLOT_NAME, imports=True, mixins=True, reflexive=True, is_a=True) -> list[ - SlotDefinitionName]: - """ - Closure of slot_parents method + def slot_ancestors( + self, slot_name: SLOT_NAME, imports: bool = True, mixins: bool = True, reflexive: bool = True, is_a: bool = True + ) -> list[SlotDefinitionName]: + """Return the closure of slot_parents method. :param slot_name: query slot :param imports: include import closure @@ -946,15 +1043,20 @@ def slot_ancestors(self, slot_name: SLOT_NAME, imports=True, mixins=True, reflex :param reflexive: include self in set of ancestors :return: ancestor slot names """ - return _closure(lambda x: self.slot_parents(x, imports=imports, mixins=mixins, is_a=is_a), - slot_name, - reflexive=reflexive) + return _closure( + lambda x: self.slot_parents(x, imports=imports, mixins=mixins, is_a=is_a), slot_name, reflexive=reflexive + ) @lru_cache(None) - def class_descendants(self, class_name: CLASS_NAME, imports=True, mixins=True, reflexive=True, is_a=True) -> list[ - ClassDefinitionName]: - """ - Closure of class_children method + def class_descendants( + self, + class_name: CLASS_NAME, + imports: bool = True, + mixins: bool = True, + reflexive: bool = True, + is_a: bool = True, + ) -> list[ClassDefinitionName]: + """Return the closure of class_children method. :param class_name: query class :param imports: include import closure @@ -963,14 +1065,15 @@ def class_descendants(self, class_name: CLASS_NAME, imports=True, mixins=True, r :param reflexive: include self in set of descendants :return: descendants class names """ - return _closure(lambda x: self.class_children(x, imports=imports, mixins=mixins, is_a=is_a), class_name, - reflexive=reflexive) + return _closure( + lambda x: self.class_children(x, imports=imports, mixins=mixins, is_a=is_a), class_name, reflexive=reflexive + ) @lru_cache(None) - def slot_descendants(self, slot_name: SLOT_NAME, imports=True, mixins=True, reflexive=True, is_a=True) -> list[ - SlotDefinitionName]: - """ - Closure of slot_children method + def slot_descendants( + self, slot_name: SLOT_NAME, imports: bool = True, mixins: bool = True, reflexive: bool = True, is_a: bool = True + ) -> list[SlotDefinitionName]: + """Return the closure of slot_children method. :param slot_name: query slot :param imports: include import closure @@ -979,90 +1082,92 @@ def slot_descendants(self, slot_name: SLOT_NAME, imports=True, mixins=True, refl :param reflexive: include self in set of descendants :return: descendants slot names """ - return _closure(lambda x: self.slot_children(x, imports=imports, mixins=mixins, is_a=is_a), slot_name, - reflexive=reflexive) + return _closure( + lambda x: self.slot_children(x, imports=imports, mixins=mixins, is_a=is_a), slot_name, reflexive=reflexive + ) @lru_cache(None) - def class_roots(self, imports=True, mixins=True, is_a=True) -> list[ClassDefinitionName]: - """ - All classes that have no parents + def class_roots(self, imports: bool = True, mixins: bool = True, is_a: bool = True) -> list[ClassDefinitionName]: + """Return all classes that have no parents. + :param imports: :param mixins: :param is_a: include is_a parents (default is True) :return: """ - return [c - for c in self.all_classes(imports=imports) - if self.class_parents(c, mixins=mixins, is_a=is_a, imports=imports) == []] + return [ + c + for c in self.all_classes(imports=imports) + if self.class_parents(c, mixins=mixins, is_a=is_a, imports=imports) == [] + ] @lru_cache(None) - def class_leaves(self, imports=True, mixins=True, is_a=True) -> list[ClassDefinitionName]: - """ - All classes that have no children + def class_leaves(self, imports: bool = True, mixins: bool = True, is_a: bool = True) -> list[ClassDefinitionName]: + """Return all classes that have no children. + :param imports: :param mixins: :param is_a: include is_a parents (default is True) :return: """ - return [c - for c in self.all_classes(imports=imports) - if self.class_children(c, mixins=mixins, is_a=is_a, imports=imports) == []] + return [ + c + for c in self.all_classes(imports=imports) + if self.class_children(c, mixins=mixins, is_a=is_a, imports=imports) == [] + ] @lru_cache(None) - def slot_roots(self, imports=True, mixins=True) -> list[SlotDefinitionName]: - """ - All slotes that have no parents + def slot_roots(self, imports: bool = True, mixins: bool = True) -> list[SlotDefinitionName]: + """Return all slots that have no parents. + :param imports: :param mixins: :return: """ - return [c - for c in self.all_slots(imports=imports) - if self.slot_parents(c, mixins=mixins, imports=imports) == []] + return [ + c for c in self.all_slots(imports=imports) if self.slot_parents(c, mixins=mixins, imports=imports) == [] + ] @lru_cache(None) - def slot_leaves(self, imports=True, mixins=True) -> list[SlotDefinitionName]: - """ - All slotes that have no children + def slot_leaves(self, imports: bool = True, mixins: bool = True) -> list[SlotDefinitionName]: + """Return all slots that have no children. + :param imports: :param mixins: :return: """ - return [c - for c in self.all_slots(imports=imports) - if self.slot_children(c, mixins=mixins, imports=imports) == []] + return [ + c for c in self.all_slots(imports=imports) if self.slot_children(c, mixins=mixins, imports=imports) == [] + ] @lru_cache(None) def is_multivalued(self, slot_name: SlotDefinition) -> bool: - """ - returns True if slot is multivalued, else returns False + """Return True if slot is multivalued, else returns False. + :param slot_name: slot to test for multivalued :return boolean: """ induced_slot = self.induced_slot(slot_name) - return True if induced_slot.multivalued else False + return bool(induced_slot.multivalued) @lru_cache(None) def slot_is_true_for_metadata_property(self, slot_name: SlotDefinition, metadata_property: str) -> bool: - """ - Returns true if the value of the provided "metadata_property" is True. For example, - sv.slot_is_true_for_metadata_property('id','identifier') + """Return true if the value of the provided "metadata_property" is True. + + For example, sv.slot_is_true_for_metadata_property('id','identifier') will return True if the slot id has the identifier property set to True. :param slot_name: slot to test for multivalued :param metadata_property: controlled vocabulary for boolean attribtues :return: boolean """ - induced_slot = self.induced_slot(slot_name) if type(getattr(induced_slot, metadata_property)) == bool: return True if getattr(induced_slot, metadata_property) else False - else: - raise ValueError(f'property to introspect must be of type "boolean"') + raise ValueError('property to introspect must be of type "boolean"') - def get_element(self, element: Union[ElementName, Element], imports=True) -> Element: - """ - Fetch an element by name + def get_element(self, element: ElementName | Element, imports: bool = True) -> Element: + """Fetch an element by name. :param element: query element :param imports: include imports closure @@ -1081,10 +1186,18 @@ def get_element(self, element: Union[ElementName, Element], imports=True) -> Ele e = self.get_subset(element, imports=imports) return e - def get_uri(self, element: Union[ElementName, Element], imports=True, expand=False, native=False, use_element_type=False) -> str: - """ - Return the CURIE or URI for a schema element. If the schema defines a specific URI, this is - used, otherwise this is constructed from the default prefix combined with the element name + def get_uri( + self, + element: ElementName | Element, + imports: bool = True, + expand: bool = False, + native: bool = False, + use_element_type: bool = False, + ) -> str: + """Return the CURIE or URI for a schema element. + + If the schema defines a specific URI, this is used; + otherwise this is constructed from the default prefix combined with the element name. :param element: name of schema element :param imports: include imports closure @@ -1104,39 +1217,38 @@ def get_uri(self, element: Union[ElementName, Element], imports=True, expand=Fal uri = e.uri e_name = underscore(e.name) else: - raise ValueError(f'Must be class or slot or type: {e}') + raise ValueError(f"Must be class or slot or type: {e}") if uri is None or native: if e.from_schema is not None: schema = next((sc for sc in self.schema_map.values() if sc.id == e.from_schema), None) if schema is None: - raise ValueError(f'Cannot find {e.from_schema} in schema_map') + raise ValueError(f"Cannot find {e.from_schema} in schema_map") else: schema = self.schema_map[self.in_schema(e.name)] if use_element_type: - e_type = e.class_name.split("_",1)[0] # for example "class_definition" - e_type_path = f"{e_type}/" + e_type = e.class_name.split("_", 1)[0] # for example "class_definition" + e_type_path = f"{e_type}/" else: e_type_path = "" pfx = schema.default_prefix - # To construct the uri we have to find out if the schema has a default_prefix + # To construct the uri we have to find out if the schema has a default_prefix # or if a pseudo "prefix" was derived from the schema id. if pfx == sfx(str(schema.id)): # no prefix defined in schema - uri = f'{pfx}{e_type_path}{e_name}' + uri = f"{pfx}{e_type_path}{e_name}" else: - uri = f'{pfx}:{e_type_path}{e_name}' + uri = f"{pfx}:{e_type_path}{e_name}" if expand: return self.expand_curie(uri) - else: - return uri + return uri def expand_curie(self, uri: str) -> str: - """ - Expands a URI or CURIE to a full URI + """Expand a URI or CURIE to a full URI. + :param uri: :return: URI as a string """ - if ':' in uri: - parts = uri.split(':') + if ":" in uri: + parts = uri.split(":") if len(parts) == 2: [pfx, local_id] = parts ns = self.namespaces() @@ -1146,8 +1258,9 @@ def expand_curie(self, uri: str) -> str: @lru_cache(CACHE_SIZE) def get_elements_applicable_by_identifier(self, identifier: str) -> list[str]: - """ - Get a model element by identifier. The model element corresponding to the given identifier as available via + """Get a model element by identifier. + + The model element corresponding to the given identifier as available via the id_prefixes mapped to that element. :param identifier: @@ -1156,14 +1269,17 @@ def get_elements_applicable_by_identifier(self, identifier: str) -> list[str]: """ elements = self.get_elements_applicable_by_prefix(self.namespaces().prefix_for(identifier)) if len(elements) == 0: - logger.warning("no element found for the given curie using id_prefixes attribute" - ": %s, try get_mappings method?", identifier) + logger.warning( + "no element found for the given curie using id_prefixes attribute: %s, try get_mappings method?", + identifier, + ) return elements @lru_cache(CACHE_SIZE) def get_elements_applicable_by_prefix(self, prefix: str) -> list[str]: - """ - Get a model element by prefix. The model element corresponding to the given prefix as available via + """Get a model element by prefix. + + The model element corresponding to the given prefix as available via the id_prefixes mapped to that element. :param prefix: the prefix of a CURIE @@ -1173,22 +1289,21 @@ def get_elements_applicable_by_prefix(self, prefix: str) -> list[str]: applicable_elements = [] elements = self.all_elements() for category, category_element in elements.items(): - if hasattr(category_element, 'id_prefixes') and prefix in category_element.id_prefixes: + if hasattr(category_element, "id_prefixes") and prefix in category_element.id_prefixes: applicable_elements.append(category_element.name) return applicable_elements @lru_cache(None) def all_aliases(self) -> list[str]: - """ - Get the aliases + """Get all aliases. :return: list of aliases """ element_aliases = {} for e, el in self.all_elements().items(): - if el.name not in element_aliases.keys(): + if el.name not in element_aliases: element_aliases[el.name] = [] if el.aliases and el.aliases is not None: for a in el.aliases: @@ -1200,10 +1315,10 @@ def all_aliases(self) -> list[str]: return element_aliases @lru_cache(None) - def get_mappings(self, element_name: ElementName = None, imports=True, expand=False) -> dict[ - MAPPING_TYPE, list[URIorCURIE]]: - """ - Get all mappings for a given element + def get_mappings( + self, element_name: ElementName = None, imports: bool = True, expand: bool = False + ) -> dict[MAPPING_TYPE, list[URIorCURIE]]: + """Get all mappings for a given element. :param element_name: the query element :param imports: include imports closure @@ -1213,14 +1328,14 @@ def get_mappings(self, element_name: ElementName = None, imports=True, expand=Fa e = self.get_element(element_name, imports=imports) if isinstance(e, ClassDefinition) or isinstance(e, SlotDefinition) or isinstance(e, TypeDefinition): m_dict = { - 'self': [self.get_uri(element_name, imports=imports, expand=False)], - 'native': [self.get_uri(element_name, imports=imports, expand=False, native=True)], - 'exact': e.exact_mappings, - 'narrow': e.narrow_mappings, - 'broad': e.broad_mappings, - 'related': e.related_mappings, - 'close': e.close_mappings, - 'undefined': e.mappings + "self": [self.get_uri(element_name, imports=imports, expand=False)], + "native": [self.get_uri(element_name, imports=imports, expand=False, native=True)], + "exact": e.exact_mappings, + "narrow": e.narrow_mappings, + "broad": e.broad_mappings, + "related": e.related_mappings, + "close": e.close_mappings, + "undefined": e.mappings, } else: m_dict = {} @@ -1231,24 +1346,23 @@ def get_mappings(self, element_name: ElementName = None, imports=True, expand=Fa return m_dict @lru_cache(None) - def is_mixin(self, element_name: Union[ElementName, Element]): - """ - Determines whether the given name is the name of a mixin - in the model. An element is a mixin if one of its properties is "is_mixin:true" + def is_mixin(self, element_name: ElementName | Element) -> bool: + """Determine whether the given name is the name of a mixin. + + An element is a mixin if one of its properties is "is_mixin:true" :param element_name: The name or alias of an element in the model :return: boolean """ - element = self.get_element(element_name) is_mixin = element.mixin if isinstance(element, Definition) else False return is_mixin @lru_cache(None) def inverse(self, slot_name: SlotDefinition): - """ - Determines whether the given name is a relationship, and if that relationship has an inverse, returns - the inverse. + """Determine whether the given name is a relationship, and return the inverse (if available). + + If that relationship has an inverse, returns the inverse. :param slot_name: The name or alias of an element in the model :return: inverse_name @@ -1267,14 +1381,18 @@ def get_element_by_mapping(self, mapping_id: URIorCURIE) -> list[str]: elements = self.all_elements() for el in elements: element = self.get_element(el) - mappings = element.exact_mappings + element.close_mappings + element.narrow_mappings + element.broad_mappings + mappings = ( + element.exact_mappings + element.close_mappings + element.narrow_mappings + element.broad_mappings + ) if mapping_id in mappings: model_elements.append(element.name) return model_elements - def get_mapping_index(self, imports=True, expand=False) -> dict[URIorCURIE, list[tuple[MAPPING_TYPE, Element]]]: - """ - Returns an index of all elements keyed by the mapping value. + def get_mapping_index( + self, imports: bool = True, expand: bool = False + ) -> dict[URIorCURIE, list[tuple[MAPPING_TYPE, Element]]]: + """Return an index of all elements keyed by the mapping value. + The index values are tuples of mapping type and element :param imports: @@ -1289,15 +1407,14 @@ def get_mapping_index(self, imports=True, expand=False) -> dict[URIorCURIE, list return ix @lru_cache(None) - def is_relationship(self, class_name: CLASS_NAME = None, imports=True) -> bool: - """ - Tests if a class represents a relationship or reified statement + def is_relationship(self, class_name: CLASS_NAME | None = None, imports: bool = True) -> bool: + """Test if a class represents a relationship or reified statement. :param class_name: :param imports: :return: true if the class represents a relationship """ - STMT_TYPES = ['rdf:Statement', 'owl:Axiom'] + STMT_TYPES = ["rdf:Statement", "owl:Axiom"] for an in self.class_ancestors(class_name, imports=imports): if self.get_uri(an) in STMT_TYPES: return True @@ -1308,9 +1425,8 @@ def is_relationship(self, class_name: CLASS_NAME = None, imports=True) -> bool: return False @lru_cache(None) - def annotation_dict(self, element_name: ElementName, imports=True) -> dict[URIorCURIE, Any]: - """ - Return a dictionary where keys are annotation tags and values are annotation values for any given element. + def annotation_dict(self, element_name: ElementName, imports: bool = True) -> dict[URIorCURIE, Any]: + """Return a dictionary where keys are annotation tags and values are annotation values for any given element. Note this will not include higher-order annotations @@ -1324,9 +1440,11 @@ def annotation_dict(self, element_name: ElementName, imports=True) -> dict[URIor return {k: v.value for k, v in e.annotations.items()} @lru_cache(None) - def class_slots(self, class_name: CLASS_NAME, imports=True, direct=False, attributes=True) -> list[ - SlotDefinitionName]: - """ + def class_slots( + self, class_name: CLASS_NAME, imports: bool = True, direct: bool = False, attributes: bool = True + ) -> list[SlotDefinitionName]: + """Return all slots for a class. + :param class_name: :param imports: include imports closure :param direct: only returns slots directly associated with a class (default is False) @@ -1350,9 +1468,15 @@ def class_slots(self, class_name: CLASS_NAME, imports=True, direct=False, attrib return slots_nr @lru_cache(None) - def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports=True, - mangle_name=False) -> SlotDefinition: - """ + def induced_slot( + self, + slot_name: SLOT_NAME, + class_name: CLASS_NAME | None = None, + imports: bool = True, + mangle_name: bool = False, + ) -> SlotDefinition: + """Generate a SlotDefinition with all properties materialized. + Given a slot, in the context of a particular class, yield a dynamic SlotDefinition that has all properties materialized. @@ -1386,8 +1510,10 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo slot = self.get_slot(slot_name, imports, attributes=True) if slot is None: - raise ValueError(f"No such slot {slot_name} as an attribute of {class_name} ancestors " - "or as a slot definition in the schema") + raise ValueError( + f"No such slot {slot_name} as an attribute of {class_name} ancestors " + "or as a slot definition in the schema" + ) # copy the slot, as it will be modified induced_slot = copy(slot) @@ -1400,8 +1526,8 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo if getattr(anc_slot, metaslot_name, None): setattr(induced_slot, metaslot_name, copy(getattr(anc_slot, metaslot_name))) COMBINE = { - 'maximum_value': lambda x, y: min(x, y), - 'minimum_value': lambda x, y: max(x, y), + "maximum_value": lambda x, y: min(x, y), + "minimum_value": lambda x, y: max(x, y), } # iterate through all metaslots, and potentially populate metaslot value for induced slot for metaslot_name in self._metaslots_for_slot(): @@ -1419,23 +1545,21 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo v2 = getattr(anc_slot_usage, metaslot_name, None) if v is None: v = v2 - else: - if metaslot_name in COMBINE: - if v2 is not None: - v = COMBINE[metaslot_name](v, v2) - else: - # can rewrite below as: - # 1. if v2: - # 2. if v2 is not None and - # ( - # (isinstance(v2, (dict, list)) and v2) or - # (isinstance(v2, JsonObj) and as_dict(v2)) - # ) - if not is_empty(v2): - v = v2 - logger.debug(f'{v} takes precedence over {v2} for {induced_slot.name}.{metaslot_name}') + elif metaslot_name in COMBINE: + if v2 is not None: + v = COMBINE[metaslot_name](v, v2) + # can rewrite below as: + # 1. if v2: + # 2. if v2 is not None and + # ( + # (isinstance(v2, (dict, list)) and v2) or + # (isinstance(v2, JsonObj) and as_dict(v2)) + # ) + elif not is_empty(v2): + v = v2 + logger.debug(f"{v} takes precedence over {v2} for {induced_slot.name}.{metaslot_name}") if v is None: - if metaslot_name == 'range': + if metaslot_name == "range": v = self.schema.default_range if v is not None: setattr(induced_slot, metaslot_name, v) @@ -1444,7 +1568,7 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo if slot.identifier or slot.key: slot.required = True if mangle_name: - mangled_name = f'{camelcase(class_name)}__{underscore(slot_name)}' + mangled_name = f"{camelcase(class_name)}__{underscore(slot_name)}" induced_slot.name = mangled_name if not induced_slot.alias: induced_slot.alias = underscore(slot_name) @@ -1456,13 +1580,12 @@ def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, impo @lru_cache(None) def _metaslots_for_slot(self): - fake_slot = SlotDefinition('__FAKE') + fake_slot = SlotDefinition("__FAKE") return vars(fake_slot).keys() @lru_cache(None) - def class_induced_slots(self, class_name: CLASS_NAME = None, imports=True) -> list[SlotDefinition]: - """ - All slots that are asserted or inferred for a class, with their inferred semantics + def class_induced_slots(self, class_name: CLASS_NAME | None = None, imports: bool = True) -> list[SlotDefinition]: + """Retrieve all slots that are asserted or inferred for a class, with their inferred semantics. :param class_name: :param imports: @@ -1471,9 +1594,8 @@ def class_induced_slots(self, class_name: CLASS_NAME = None, imports=True) -> li return [self.induced_slot(sn, class_name, imports=imports) for sn in self.class_slots(class_name)] @lru_cache(None) - def induced_class(self, class_name: CLASS_NAME = None) -> ClassDefinition: - """ - Generate an induced class + def induced_class(self, class_name: CLASS_NAME | None = None) -> ClassDefinition: + """Generate an induced class. - the class will have no slots - the class will have one attribute per `class_induced_slots` @@ -1491,8 +1613,8 @@ def induced_class(self, class_name: CLASS_NAME = None) -> ClassDefinition: return c @lru_cache(None) - def induced_type(self, type_name: TYPE_NAME = None) -> TypeDefinition: - """ + def induced_type(self, type_name: TYPE_NAME | None = None) -> TypeDefinition: + """Generate an induced type. :param type_name: :return: @@ -1509,23 +1631,21 @@ def induced_type(self, type_name: TYPE_NAME = None) -> TypeDefinition: return t @lru_cache(None) - def induced_enum(self, enum_name: ENUM_NAME = None) -> EnumDefinition: - """ + def induced_enum(self, enum_name: ENUM_NAME | None = None) -> EnumDefinition: + """Generate an induced enum. :param enum_name: :return: """ - e = deepcopy(self.get_enum(enum_name)) - return e + return deepcopy(self.get_enum(enum_name)) @lru_cache(None) - def get_identifier_slot(self, cn: CLASS_NAME, use_key=False, imports=True) -> Optional[SlotDefinition]: - """ - Find the slot that is the identifier for the given class + def get_identifier_slot(self, cn: CLASS_NAME, use_key: bool = False, imports: bool = True) -> SlotDefinition | None: + """Retrieve the slot that is the identifier for the given class. :param cn: class name :param imports: - :return: name of slot that acts as identifier + :return: slot that acts as identifier, if present """ for sn in self.class_slots(cn, imports=imports): s = self.induced_slot(sn, cn, imports=imports) @@ -1533,17 +1653,15 @@ def get_identifier_slot(self, cn: CLASS_NAME, use_key=False, imports=True) -> Op return s if use_key: return self.get_key_slot(cn, imports=imports) - else: - return None + return None @lru_cache(None) - def get_key_slot(self, cn: CLASS_NAME, imports=True) -> Optional[SlotDefinition]: - """ - Find the slot that is the key for the given class + def get_key_slot(self, cn: CLASS_NAME, imports: bool = True) -> SlotDefinition | None: + """Retrieve the slot that is the key for the given class. :param cn: class name :param imports: - :return: name of slot that acts as key + :return: slot that acts as key, if present """ for sn in self.class_slots(cn, imports=imports): s = self.induced_slot(sn, cn, imports=imports) @@ -1552,8 +1670,9 @@ def get_key_slot(self, cn: CLASS_NAME, imports=True) -> Optional[SlotDefinition] return None @lru_cache(None) - def get_type_designator_slot(self, cn: CLASS_NAME, imports=True) -> Optional[SlotDefinition]: - """ + def get_type_designator_slot(self, cn: CLASS_NAME, imports: bool = True) -> SlotDefinition | None: + """Get the type designator slot for a class. + :param cn: class name :param imports: :return: name of slot that acts as type designator for the given class @@ -1564,9 +1683,8 @@ def get_type_designator_slot(self, cn: CLASS_NAME, imports=True) -> Optional[Slo return s return None - def is_inlined(self, slot: SlotDefinition, imports=True) -> bool: - """ - True if slot is inferred or asserted inline + def is_inlined(self, slot: SlotDefinition, imports: bool = True) -> bool: + """Return true if slot is inferred or asserted inline. :param slot: :param imports: @@ -1574,24 +1692,20 @@ def is_inlined(self, slot: SlotDefinition, imports=True) -> bool: """ range = slot.range if range in self.all_classes(): - if slot.inlined: - return True - elif slot.inlined_as_list: + if slot.inlined or slot.inlined_as_list: return True id_slot = self.get_identifier_slot(range, imports=imports) if id_slot is None: # must be inlined as has no identifier return True - else: - # not explicitly declared inline and has an identifier: assume is ref, not inlined - return False - else: + # not explicitly declared inline and has an identifier: assume is ref, not inlined return False + return False def slot_applicable_range_elements(self, slot: SlotDefinition) -> list[ClassDefinitionName]: - """ - Returns all applicable metamodel elements for a slot range + """Retrieve all applicable metamodel elements for a slot range. + (metamodel class names returned: class_definition, enum_definition, type_definition) Typically any given slot has exactly one range, and one metamodel element type, @@ -1608,19 +1722,18 @@ def slot_applicable_range_elements(self, slot: SlotDefinition) -> list[ClassDefi if r in self.all_classes(): range_types.append(ClassDefinition.class_name) rc = self.get_class(r) - if rc.class_uri == 'linkml:Any': + if rc.class_uri == "linkml:Any": is_any = True if is_any or r in self.all_enums(): range_types.append(EnumDefinition.class_name) if is_any or r in self.all_types(): range_types.append(TypeDefinition.class_name) if not range_types: - raise ValueError(f'Unrecognized range: {r}') + raise ValueError(f"Unrecognized range: {r}") return range_types def slot_range_as_union(self, slot: SlotDefinition) -> list[ElementName]: - """ - Returns all applicable ranges for a slot + """Retrieve all applicable ranges for a slot. Typically, any given slot has exactly one range, and one metamodel element type, but a proposed feature in LinkML 1.2 is range expressions, where ranges can be defined as unions @@ -1635,9 +1748,7 @@ def slot_range_as_union(self, slot: SlotDefinition) -> list[ElementName]: range_union_of.append(x.range) return range_union_of - def get_classes_by_slot( - self, slot: SlotDefinition, include_induced: bool = False - ) -> list[ClassDefinitionName]: + def get_classes_by_slot(self, slot: SlotDefinition, include_induced: bool = False) -> list[ClassDefinitionName]: """Get all classes that use a given slot, either as a direct or induced slot. :param slot: slot in consideration @@ -1653,9 +1764,7 @@ def get_classes_by_slot( if include_induced: for c_name in all_classes: - induced_slot_names = [ - ind_slot.name for ind_slot in self.class_induced_slots(c_name) - ] + induced_slot_names = [ind_slot.name for ind_slot in self.class_induced_slots(c_name)] if slot.name in induced_slot_names: classes_set.add(c_name) @@ -1694,9 +1803,8 @@ def get_classes_modifying_slot(self, slot: SlotDefinition) -> list[ClassDefiniti return modifying_classes - def is_slot_percent_encoded(self, slot: SlotDefinitionName) -> bool: - """ - True if slot or its range is has a percent_encoded annotation. + def is_slot_percent_encoded(self, slot: SlotDefinitionName) -> bool | None: + """Return true if slot or its range is has a percent_encoded annotation. This is true for type fields that are the range of identifier columns, where the identifier is not guaranteed to be a valid URI or CURIE @@ -1711,15 +1819,15 @@ def is_slot_percent_encoded(self, slot: SlotDefinitionName) -> bool: for t in id_slot_ranges: anns = self.get_type(t).annotations return "percent_encoded" in anns + return None @lru_cache(None) def usage_index(self) -> dict[ElementName, list[SchemaUsage]]: - """ - Fetch an index that shows the ways in which each element is used + """Fetch an index that shows the ways in which each element is used. :return: dictionary of SchemaUsages keyed by used elements """ - ROLES = ['domain', 'range', 'any_of', 'exactly_one_of', 'none_of', 'all_of'] + ROLES = ["domain", "range", "any_of", "exactly_one_of", "none_of", "all_of"] ix = defaultdict(list) for cn, c in self.all_classes().items(): direct_slots = c.slots @@ -1745,7 +1853,8 @@ def usage_index(self) -> dict[ElementName, list[SchemaUsage]]: # MUTATION OPERATIONS def add_class(self, cls: ClassDefinition) -> None: - """ + """Add a class to the schema. + :param cls: class to be added :return: """ @@ -1753,7 +1862,8 @@ def add_class(self, cls: ClassDefinition) -> None: self.set_modified() def add_slot(self, slot: SlotDefinition) -> None: - """ + """Add a slot to the schema. + :param slot: slot to be added :return: """ @@ -1761,31 +1871,35 @@ def add_slot(self, slot: SlotDefinition) -> None: self.set_modified() def add_enum(self, enum: EnumDefinition) -> None: - """ + """Add an enum to the schema. + :param enum: enum to be added :return: """ self.schema.enums[enum.name] = enum self.set_modified() - def add_type(self, type: TypeDefinition) -> None: - """ + def add_type(self, type_def: TypeDefinition) -> None: + """Add a type to the schema. + :param type: type to be added :return: """ - self.schema.types[type.name] = type + self.schema.types[type_def.name] = type_def self.set_modified() def add_subset(self, subset: SubsetDefinition) -> None: - """ + """Add a subset to the schema. + :param subset: subset to be added :return: """ - self.schema.subsets[subset.name] = type + self.schema.subsets[subset.name] = subset self.set_modified() - def delete_class(self, class_name: ClassDefinitionName, delete_references=True) -> None: - """ + def delete_class(self, class_name: ClassDefinitionName, delete_references: bool = True) -> None: + """Delete a class from the schema. + :param class_name: class to be deleted :return: """ @@ -1802,7 +1916,8 @@ def delete_class(self, class_name: ClassDefinitionName, delete_references=True) self.set_modified() def delete_slot(self, slot_name: SlotDefinitionName) -> None: - """ + """Delete a slot from the schema. + :param slot_name: slot to be deleted :return: """ @@ -1810,7 +1925,8 @@ def delete_slot(self, slot_name: SlotDefinitionName) -> None: self.set_modified() def delete_enum(self, enum_name: EnumDefinitionName) -> None: - """ + """Delete an enum from the schema. + :param enum_name: enum to be deleted :return: """ @@ -1818,7 +1934,8 @@ def delete_enum(self, enum_name: EnumDefinitionName) -> None: self.set_modified() def delete_type(self, type_name: TypeDefinitionName) -> None: - """ + """Delete a type from the schema. + :param type_name: type to be deleted :return: """ @@ -1826,7 +1943,8 @@ def delete_type(self, type_name: TypeDefinitionName) -> None: self.set_modified() def delete_subset(self, subset_name: SubsetDefinitionName) -> None: - """ + """Delete a subset from the schema. + :param subset_name: subset to be deleted :return: """ @@ -1834,11 +1952,10 @@ def delete_subset(self, subset_name: SubsetDefinitionName) -> None: self.set_modified() # def rename(self, old_name: str, new_name: str): - # todo: add to runtime + # TODO: add to runtime - def merge_schema(self, schema: SchemaDefinition, clobber=False) -> None: - """ - merges another schema into this one. + def merge_schema(self, schema: SchemaDefinition, clobber: bool = False) -> None: + """Merge another schema into this one. If the other schema has an element with the same name as an element in this schema, then this element is NOT copied. @@ -1867,12 +1984,8 @@ def merge_schema(self, schema: SchemaDefinition, clobber=False) -> None: dest.subsets[k] = copy(v) self.set_modified() - def merge_imports(self): - """ - Merges the full imports closure - - :return: - """ + def merge_imports(self) -> None: + """Merge the full imports closure.""" schema = self.schema to_merge = [s2 for s2 in self.all_schema(imports=True) if s2 != schema] for s2 in to_merge: @@ -1880,21 +1993,26 @@ def merge_imports(self): schema.imports = [] self.set_modified() - def copy_schema(self, new_name: str = None) -> SchemaDefinition: + def copy_schema(self, new_name: str | None = None) -> SchemaDefinition: + """Generate a copy of the schema. + + :param new_name: name for the new schema, defaults to None + :type new_name: str | None, optional + :return: copied SchemaDefinition, optionally with new name + :rtype: SchemaDefinition + """ s2 = copy(self.schema) if new_name is not None: s2.name = new_name return s2 def set_modified(self) -> None: + """Increase the number of schema modifications by 1.""" self._hash = None self.modifications += 1 def materialize_patterns(self) -> None: - """Materialize schema by expanding structured patterns - into regular expressions based on composite patterns - provided in the settings dictionary. - """ + """Materialize schema by expanding structured patterns into regular expressions based on composite patterns provided in the settings dictionary.""" resolver = PatternResolver(self) def materialize_pattern_into_slot_definition(slot_definition: SlotDefinition) -> None: @@ -1916,7 +2034,7 @@ def materialize_pattern_into_slot_definition(slot_definition: SlotDefinition) -> materialize_pattern_into_slot_definition(slot_definition) def materialize_derived_schema(self) -> SchemaDefinition: - """ Materialize a schema view into a schema definition """ + """Materialize a schema view into a schema definition.""" derived_schema = deepcopy(self.schema) derived_schemaview = SchemaView(derived_schema) derived_schemaview.merge_imports() diff --git a/tests/test_index/test_object_index.py b/tests/test_index/test_object_index.py index ea2d5e6e..5736da43 100644 --- a/tests/test_index/test_object_index.py +++ b/tests/test_index/test_object_index.py @@ -1,7 +1,7 @@ import os import unittest -from linkml_runtime import SchemaView +from linkml_runtime.utils.schemaview import SchemaView from linkml_runtime.loaders import yaml_loader import tests.test_index.model.container_test as src_dm from linkml_runtime.index.object_index import ObjectIndex, ProxyObject diff --git a/tests/test_issues/test_linkml_runtime_issue_1317.py b/tests/test_issues/test_linkml_runtime_issue_1317.py index 1ca6698e..cca9b5ff 100644 --- a/tests/test_issues/test_linkml_runtime_issue_1317.py +++ b/tests/test_issues/test_linkml_runtime_issue_1317.py @@ -1,5 +1,5 @@ import unittest -from linkml_runtime import SchemaView +from linkml_runtime.utils.schemaview import SchemaView URL = ("https://raw.githubusercontent.com/linkml/linkml-runtime/" "2a46c65fe2e7db08e5e524342e5ff2ffb94bec92/tests/test_utils/input/kitchen_sink.yaml") diff --git a/tests/test_utils/test_schemaview.py b/tests/test_utils/test_schemaview.py index 01ea81d6..a6fe6b31 100644 --- a/tests/test_utils/test_schemaview.py +++ b/tests/test_utils/test_schemaview.py @@ -1,61 +1,77 @@ -import os +"""Tests of the SchemaView package.""" + import logging +import os from copy import copy from pathlib import Path + import pytest from jsonasobj2 import JsonObj from linkml_runtime.dumpers import yaml_dumper -from linkml_runtime.linkml_model.meta import Example, SchemaDefinition, ClassDefinition, SlotDefinitionName, \ - SlotDefinition, \ - ClassDefinitionName, Prefix, TypeDefinition +from linkml_runtime.linkml_model.meta import ( + ClassDefinition, + ClassDefinitionName, + Example, + Prefix, + SchemaDefinition, + SlotDefinition, + SlotDefinitionName, + TypeDefinition, +) from linkml_runtime.loaders.yaml_loader import YAMLLoader from linkml_runtime.utils.introspection import package_schemaview -from linkml_runtime.utils.schemaview import SchemaView, SchemaUsage, OrderedBy -from linkml_runtime.utils.schemaops import roll_up, roll_down +from linkml_runtime.utils.schemaops import roll_down, roll_up +from linkml_runtime.utils.schemaview import SchemaUsage, SchemaView from tests.test_utils import INPUT_DIR logger = logging.getLogger(__name__) -SCHEMA_NO_IMPORTS = Path(INPUT_DIR) / 'kitchen_sink_noimports.yaml' -SCHEMA_WITH_IMPORTS = Path(INPUT_DIR) / 'kitchen_sink.yaml' -SCHEMA_WITH_STRUCTURED_PATTERNS = Path(INPUT_DIR) / 'pattern-example.yaml' -SCHEMA_IMPORT_TREE = Path(INPUT_DIR) / 'imports' / 'main.yaml' -SCHEMA_RELATIVE_IMPORT_TREE = Path(INPUT_DIR) / 'imports_relative' / 'L0_0' / 'L1_0_0' / 'main.yaml' -SCHEMA_RELATIVE_IMPORT_TREE2 = Path(INPUT_DIR) / 'imports_relative' / 'L0_2' / 'main.yaml' +SCHEMA_NO_IMPORTS = Path(INPUT_DIR) / "kitchen_sink_noimports.yaml" +SCHEMA_WITH_IMPORTS = Path(INPUT_DIR) / "kitchen_sink.yaml" +SCHEMA_WITH_STRUCTURED_PATTERNS = Path(INPUT_DIR) / "pattern-example.yaml" +SCHEMA_IMPORT_TREE = Path(INPUT_DIR) / "imports" / "main.yaml" +SCHEMA_RELATIVE_IMPORT_TREE = Path(INPUT_DIR) / "imports_relative" / "L0_0" / "L1_0_0" / "main.yaml" +SCHEMA_RELATIVE_IMPORT_TREE2 = Path(INPUT_DIR) / "imports_relative" / "L0_2" / "main.yaml" yaml_loader = YAMLLoader() -IS_CURRENT = 'is current' -EMPLOYED_AT = 'employed at' -COMPANY = 'Company' -AGENT = 'agent' -ACTIVITY = 'activity' -RELATED_TO = 'related to' -AGE_IN_YEARS = 'age in years' +IS_CURRENT = "is current" +EMPLOYED_AT = "employed at" +COMPANY = "Company" +AGENT = "agent" +ACTIVITY = "activity" +RELATED_TO = "related to" +AGE_IN_YEARS = "age in years" + @pytest.fixture -def schema_view_no_imports(): +def schema_view_no_imports() -> SchemaView: + """Fixture for SchemaView with a schema with no imports.""" return SchemaView(SCHEMA_NO_IMPORTS) @pytest.fixture -def view(): +def schema_view_with_imports() -> SchemaView: """Fixture for SchemaView with imports.""" return SchemaView(SCHEMA_WITH_IMPORTS) -def test_children_method(schema_view_no_imports): +@pytest.fixture(scope="session") +def schema_view_attributes() -> SchemaView: + """Fixture for a SchemaView for testing attribute edge cases.""" + return SchemaView(os.path.join(INPUT_DIR, "attribute_edge_cases.yaml")) + + +def test_children_method(schema_view_no_imports: SchemaView) -> None: + """Test retrieval of the children of a class.""" view = schema_view_no_imports children = view.get_children("Person") - assert children == ['Adult'] + assert children == ["Adult"] -def test_all_aliases(schema_view_no_imports): - """ - This tests the aliases slot (not: alias) - :return: - """ +def test_all_aliases(schema_view_no_imports: SchemaView) -> None: + """Test the aliases slot (not: alias).""" view = schema_view_no_imports aliases = view.all_aliases() assert "identifier" in aliases["id"] @@ -63,9 +79,9 @@ def test_all_aliases(schema_view_no_imports): assert "B" in aliases["subset B"] assert "dad" in aliases["Adult"] -def test_alias_slot(schema_view_no_imports): - """ - Tests the alias slot. + +def test_alias_slot(schema_view_no_imports: SchemaView) -> None: + """Tests the alias slot. The induced slot alias should always be populated. For induced slots, it should default to the name field if not present. @@ -76,12 +92,13 @@ def test_alias_slot(schema_view_no_imports): for s in view.class_induced_slots(c.name): assert s.alias is not None # Assert that alias is not None - postal_code_slot = view.induced_slot('postal code', 'Address') - assert postal_code_slot.name == 'postal code' # Assert name is 'postal code' - assert postal_code_slot.alias == 'zip' # Assert alias is 'zip' + postal_code_slot = view.induced_slot("postal code", "Address") + assert postal_code_slot.name == "postal code" # Assert name is 'postal code' + assert postal_code_slot.alias == "zip" # Assert alias is 'zip' -def test_schemaview_enums(schema_view_no_imports): +def test_schemaview_enums(schema_view_no_imports: SchemaView) -> None: + """Test various aspects of Enum representation.""" view = schema_view_no_imports # Test for ValueError when passing incorrect parameters @@ -93,7 +110,7 @@ def test_schemaview_enums(schema_view_no_imports): for pv, v in e.permissible_values.items(): if pv == "CAT": assert view.permissible_value_parent(pv, e.name) is None - assert view.permissible_value_ancestors(pv, e.name) == ['CAT'] + assert view.permissible_value_ancestors(pv, e.name) == ["CAT"] assert "LION" in view.permissible_value_descendants(pv, e.name) assert "ANGRY_LION" in view.permissible_value_descendants(pv, e.name) assert "TABBY" in view.permissible_value_descendants(pv, e.name) @@ -105,28 +122,29 @@ def test_schemaview_enums(schema_view_no_imports): assert "ANGRY_LION" in view.permissible_value_children(pv, e.name) if pv == "ANGRY_LION": - assert view.permissible_value_parent(pv, e.name) == ['LION'] - assert view.permissible_value_ancestors(pv, e.name) == ['ANGRY_LION', 'LION', 'CAT'] + assert view.permissible_value_parent(pv, e.name) == ["LION"] + assert view.permissible_value_ancestors(pv, e.name) == ["ANGRY_LION", "LION", "CAT"] assert view.permissible_value_descendants(pv, e.name) == ["ANGRY_LION"] for cn, c in view.all_classes().items(): if c.name == "Adult": - assert view.class_ancestors(c.name) == ['Adult', 'Person', 'HasAliases', 'Thing'] + assert view.class_ancestors(c.name) == ["Adult", "Person", "HasAliases", "Thing"] -def test_schemaview(schema_view_no_imports): +def test_schemaview(schema_view_no_imports: SchemaView) -> None: + """General SchemaView tests.""" view = schema_view_no_imports logger.debug(view.imports_closure()) assert len(view.imports_closure()) == 1 all_cls = view.all_classes() - logger.debug(f'n_cls = {len(all_cls)}') + logger.debug(f"n_cls = {len(all_cls)}") - assert list(view.annotation_dict(IS_CURRENT).values()) == ['bar'] + assert list(view.annotation_dict(IS_CURRENT).values()) == ["bar"] logger.debug(view.annotation_dict(EMPLOYED_AT)) e = view.get_element(EMPLOYED_AT) logger.debug(e.annotations) - e = view.get_element('has employment history') + e = view.get_element("has employment history") logger.debug(e.annotations) elements = view.get_elements_applicable_by_identifier("ORCID:1234") @@ -138,18 +156,18 @@ def test_schemaview(schema_view_no_imports): elements = view.get_elements_applicable_by_identifier("TEST:1234") assert "anatomical entity" not in elements - assert list(view.annotation_dict(SlotDefinitionName(IS_CURRENT)).values()) == ['bar'] + assert list(view.annotation_dict(SlotDefinitionName(IS_CURRENT)).values()) == ["bar"] logger.debug(view.annotation_dict(SlotDefinitionName(EMPLOYED_AT))) element = view.get_element(SlotDefinitionName(EMPLOYED_AT)) logger.debug(element.annotations) - element = view.get_element(SlotDefinitionName('has employment history')) + element = view.get_element(SlotDefinitionName("has employment history")) logger.debug(element.annotations) - assert view.is_mixin('WithLocation') - assert not view.is_mixin('BirthEvent') + assert view.is_mixin("WithLocation") + assert not view.is_mixin("BirthEvent") - assert view.inverse('employment history of') == 'has employment history' - assert view.inverse('has employment history') == 'employment history of' + assert view.inverse("employment history of") == "has employment history" + assert view.inverse("has employment history") == "employment history of" mapping = view.get_mapping_index() assert mapping is not None @@ -157,229 +175,244 @@ def test_schemaview(schema_view_no_imports): category_mapping = view.get_element_by_mapping("GO:0005198") assert category_mapping == [ACTIVITY] - assert view.is_multivalued("aliases") assert not view.is_multivalued("id") assert view.is_multivalued("dog addresses") - assert view.slot_is_true_for_metadata_property('aliases', 'multivalued') - assert view.slot_is_true_for_metadata_property('id', 'identifier') + assert view.slot_is_true_for_metadata_property("aliases", "multivalued") + assert view.slot_is_true_for_metadata_property("id", "identifier") with pytest.raises(ValueError): - view.slot_is_true_for_metadata_property('aliases', 'aliases') + view.slot_is_true_for_metadata_property("aliases", "aliases") for tn, t in view.all_types().items(): - logger.info(f'TN = {tn}') - assert t.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + logger.info(f"TN = {tn}") + assert t.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" for sn, s in view.all_slots().items(): - logger.info(f'SN = {sn} RANGE={s.range}') - assert s.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + logger.info(f"SN = {sn} RANGE={s.range}") + assert s.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" rng = view.induced_slot(sn).range assert rng is not None for cn in all_cls.keys(): c = view.get_class(cn) - assert c.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' - logger.debug(f'{cn} PARENTS = {view.class_parents(cn)}') - logger.debug(f'{cn} ANCS = {view.class_ancestors(cn)}') - logger.debug(f'{cn} CHILDREN = {view.class_children(cn)}') - logger.debug(f'{cn} DESCS = {view.class_descendants(cn)}') - logger.debug(f'{cn} SCHEMA = {view.in_schema(cn)}') - logger.debug(f' SLOTS = {view.class_slots(cn)}') + assert c.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" + logger.debug(f"{cn} PARENTS = {view.class_parents(cn)}") + logger.debug(f"{cn} ANCS = {view.class_ancestors(cn)}") + logger.debug(f"{cn} CHILDREN = {view.class_children(cn)}") + logger.debug(f"{cn} DESCS = {view.class_descendants(cn)}") + logger.debug(f"{cn} SCHEMA = {view.in_schema(cn)}") + logger.debug(f" SLOTS = {view.class_slots(cn)}") for sn in view.class_slots(cn): slot = view.get_slot(sn) - assert slot.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' - logger.debug(f' SLOT {sn} R: {slot.range} U: {view.get_uri(sn)} ANCS: {view.slot_ancestors(sn)}') + assert slot.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" + logger.debug(f" SLOT {sn} R: {slot.range} U: {view.get_uri(sn)} ANCS: {view.slot_ancestors(sn)}") induced_slot = view.induced_slot(sn, cn) - logger.debug(f' INDUCED {sn}={induced_slot}') + logger.debug(f" INDUCED {sn}={induced_slot}") assert induced_slot.range is not None - logger.debug(f'ALL = {view.all_elements().keys()}') + logger.debug(f"ALL = {view.all_elements().keys()}") # -- TEST ANCESTOR/DESCENDANTS FUNCTIONS -- - assert set(view.class_ancestors(COMPANY)) == {'Company', 'Organization', 'HasAliases', 'Thing'} - assert set(view.class_ancestors(COMPANY, reflexive=False)) == {'Organization', 'HasAliases', 'Thing'} - assert set(view.class_descendants('Thing')) == {'Thing', 'Person', 'Organization', COMPANY, 'Adult'} + assert set(view.class_ancestors(COMPANY)) == {"Company", "Organization", "HasAliases", "Thing"} + assert set(view.class_ancestors(COMPANY, reflexive=False)) == {"Organization", "HasAliases", "Thing"} + assert set(view.class_descendants("Thing")) == {"Thing", "Person", "Organization", COMPANY, "Adult"} # -- TEST CLASS SLOTS -- - assert set(view.class_slots('Person')) == { - 'id', 'name', 'has employment history', 'has familial relationships', - 'has medical history', AGE_IN_YEARS, 'addresses', 'has birth event', - 'reason_for_happiness', 'aliases' - } - assert view.class_slots('Person') == view.class_slots('Adult') - assert set(view.class_slots(COMPANY)) == { - 'id', 'name', 'ceo', 'aliases' + assert set(view.class_slots("Person")) == { + "id", + "name", + "has employment history", + "has familial relationships", + "has medical history", + AGE_IN_YEARS, + "addresses", + "has birth event", + "reason_for_happiness", + "aliases", } + assert view.class_slots("Person") == view.class_slots("Adult") + assert set(view.class_slots(COMPANY)) == {"id", "name", "ceo", "aliases"} - assert view.get_class(AGENT).class_uri == 'prov:Agent' - assert view.get_uri(AGENT) == 'prov:Agent' + assert view.get_class(AGENT).class_uri == "prov:Agent" + assert view.get_uri(AGENT) == "prov:Agent" logger.debug(view.get_class(COMPANY).class_uri) - assert view.get_uri(COMPANY) == 'ks:Company' + assert view.get_uri(COMPANY) == "ks:Company" # test induced slots - for c in [COMPANY, 'Person', 'Organization']: - islot = view.induced_slot('aliases', c) + for c in [COMPANY, "Person", "Organization"]: + islot = view.induced_slot("aliases", c) assert islot.multivalued is True assert islot.owner == c - assert view.get_uri(islot, expand=True) == 'https://w3id.org/linkml/tests/kitchen_sink/aliases' - - assert view.get_identifier_slot('Company').name == 'id' - assert view.get_identifier_slot('Thing').name == 'id' - assert view.get_identifier_slot('FamilialRelationship') is None - - for c in [COMPANY, 'Person', 'Organization', 'Thing']: - assert view.induced_slot('id', c).identifier - assert not view.induced_slot('name', c).identifier - assert not view.induced_slot('name', c).required - assert view.induced_slot('name', c).range == 'string' - assert view.induced_slot('id', c).owner == c - assert view.induced_slot('name', c).owner == c - - for c in ['Event', 'EmploymentEvent', 'MedicalEvent']: - s = view.induced_slot('started at time', c) - logger.debug(f's={s.range} // c = {c}') - assert s.range == 'date' - assert s.slot_uri == 'prov:startedAtTime' + assert view.get_uri(islot, expand=True) == "https://w3id.org/linkml/tests/kitchen_sink/aliases" + + assert view.get_identifier_slot("Company").name == "id" + assert view.get_identifier_slot("Thing").name == "id" + assert view.get_identifier_slot("FamilialRelationship") is None + + for c in [COMPANY, "Person", "Organization", "Thing"]: + assert view.induced_slot("id", c).identifier + assert not view.induced_slot("name", c).identifier + assert not view.induced_slot("name", c).required + assert view.induced_slot("name", c).range == "string" + assert view.induced_slot("id", c).owner == c + assert view.induced_slot("name", c).owner == c + + for c in ["Event", "EmploymentEvent", "MedicalEvent"]: + s = view.induced_slot("started at time", c) + logger.debug(f"s={s.range} // c = {c}") + assert s.range == "date" + assert s.slot_uri == "prov:startedAtTime" assert s.owner == c c_induced = view.induced_class(c) assert c_induced.slots == [] assert c_induced.attributes != [] - s2 = c_induced.attributes['started at time'] - assert s2.range == 'date' - assert s2.slot_uri == 'prov:startedAtTime' + s2 = c_induced.attributes["started at time"] + assert s2.range == "date" + assert s2.slot_uri == "prov:startedAtTime" # test slot_usage - assert view.induced_slot(AGE_IN_YEARS, 'Person').minimum_value == 0 - assert view.induced_slot(AGE_IN_YEARS, 'Adult').minimum_value == 16 - assert view.induced_slot('name', 'Person').pattern is not None - assert view.induced_slot('type', 'FamilialRelationship').range == 'FamilialRelationshipType' - assert view.induced_slot(RELATED_TO, 'FamilialRelationship').range == 'Person' - assert view.get_slot(RELATED_TO).range == 'Thing' - assert view.induced_slot(RELATED_TO, 'Relationship').range == 'Thing' - assert set(view.induced_slot('name').domain_of) == {'Thing', 'Place'} + assert view.induced_slot(AGE_IN_YEARS, "Person").minimum_value == 0 + assert view.induced_slot(AGE_IN_YEARS, "Adult").minimum_value == 16 + assert view.induced_slot("name", "Person").pattern is not None + assert view.induced_slot("type", "FamilialRelationship").range == "FamilialRelationshipType" + assert view.induced_slot(RELATED_TO, "FamilialRelationship").range == "Person" + assert view.get_slot(RELATED_TO).range == "Thing" + assert view.induced_slot(RELATED_TO, "Relationship").range == "Thing" + assert set(view.induced_slot("name").domain_of) == {"Thing", "Place"} a = view.get_class(ACTIVITY) - assert set(a.exact_mappings) == {'prov:Activity'} + assert set(a.exact_mappings) == {"prov:Activity"} logger.debug(view.get_mappings(ACTIVITY, expand=True)) - assert set(view.get_mappings(ACTIVITY)['exact']) == {'prov:Activity'} - assert set(view.get_mappings(ACTIVITY, expand=True)['exact']) == {'http://www.w3.org/ns/prov#Activity'} + assert set(view.get_mappings(ACTIVITY)["exact"]) == {"prov:Activity"} + assert set(view.get_mappings(ACTIVITY, expand=True)["exact"]) == {"http://www.w3.org/ns/prov#Activity"} u = view.usage_index() for k, v in u.items(): - logger.debug(f' {k} = {v}') - - assert SchemaUsage(used_by='FamilialRelationship', slot=RELATED_TO, - metaslot='range', used='Person', inferred=False) in u['Person'] - assert [SchemaUsage(used_by='Person', - slot='reason_for_happiness', - metaslot='any_of[range]', - used='MarriageEvent', - inferred=True - ), - SchemaUsage(used_by='Adult', - slot='reason_for_happiness', - metaslot='any_of[range]', - used='MarriageEvent', - inferred=False - )] == u['MarriageEvent'] - assert [SchemaUsage(used_by='Person', - slot='has employment history', - metaslot='range', - used='EmploymentEvent', - inferred=True), - SchemaUsage(used_by='Person', - slot='reason_for_happiness', - metaslot='any_of[range]', - used='EmploymentEvent', - inferred=True), - SchemaUsage(used_by='Adult', - slot='has employment history', - metaslot='range', - used='EmploymentEvent', - inferred=False), - SchemaUsage(used_by='Adult', - slot='reason_for_happiness', - metaslot='any_of[range]', - used='EmploymentEvent', - inferred=False)] == u['EmploymentEvent'] + logger.debug(f" {k} = {v}") + + assert ( + SchemaUsage( + used_by="FamilialRelationship", slot=RELATED_TO, metaslot="range", used="Person", inferred=False + ) + in u["Person"] + ) + assert [ + SchemaUsage( + used_by="Person", + slot="reason_for_happiness", + metaslot="any_of[range]", + used="MarriageEvent", + inferred=True, + ), + SchemaUsage( + used_by="Adult", + slot="reason_for_happiness", + metaslot="any_of[range]", + used="MarriageEvent", + inferred=False, + ), + ] == u["MarriageEvent"] + assert [ + SchemaUsage( + used_by="Person", slot="has employment history", metaslot="range", used="EmploymentEvent", inferred=True + ), + SchemaUsage( + used_by="Person", + slot="reason_for_happiness", + metaslot="any_of[range]", + used="EmploymentEvent", + inferred=True, + ), + SchemaUsage( + used_by="Adult", slot="has employment history", metaslot="range", used="EmploymentEvent", inferred=False + ), + SchemaUsage( + used_by="Adult", + slot="reason_for_happiness", + metaslot="any_of[range]", + used="EmploymentEvent", + inferred=False, + ), + ] == u["EmploymentEvent"] # test methods also work for attributes leaves = view.class_leaves() - logger.debug(f'LEAVES={leaves}') - assert 'MedicalEvent' in leaves + logger.debug(f"LEAVES={leaves}") + assert "MedicalEvent" in leaves roots = view.class_roots() - logger.debug(f'ROOTS={roots}') - assert 'Dataset' in roots - ds_slots = view.class_slots('Dataset') + logger.debug(f"ROOTS={roots}") + assert "Dataset" in roots + ds_slots = view.class_slots("Dataset") logger.debug(ds_slots) assert len(ds_slots) == 3 - assert len(['persons', 'companies', 'activities']) == len(ds_slots) + assert len(["persons", "companies", "activities"]) == len(ds_slots) for sn in ds_slots: - s = view.induced_slot(sn, 'Dataset') + s = view.induced_slot(sn, "Dataset") logger.debug(s) -def test_rollup_rolldown(schema_view_no_imports): + +def test_rollup_rolldown(schema_view_no_imports: SchemaView) -> None: + """Test rolling up and rolling down.""" # no import schema view = schema_view_no_imports - element_name = 'Event' + element_name = "Event" roll_up(view, element_name) for slot in view.class_induced_slots(element_name): logger.debug(slot) induced_slot_names = [s.name for s in view.class_induced_slots(element_name)] logger.debug(induced_slot_names) - assert len(['started at time', 'ended at time', IS_CURRENT, 'in location', EMPLOYED_AT, 'married to']) == len(induced_slot_names) + assert len(["started at time", "ended at time", IS_CURRENT, "in location", EMPLOYED_AT, "married to"]) == len( + induced_slot_names + ) # check to make sure rolled-up classes are deleted assert view.class_descendants(element_name, reflexive=False) == [] roll_down(view, view.class_leaves()) for element_name in view.all_classes(): - logger.debug(f'{element_name}') - logger.debug(f' {element_name} SLOTS(i) = {view.class_slots(element_name)}') - logger.debug(f' {element_name} SLOTS(d) = {view.class_slots(element_name, direct=True)}') + logger.debug(f"{element_name}") + logger.debug(f" {element_name} SLOTS(i) = {view.class_slots(element_name)}") + logger.debug(f" {element_name} SLOTS(d) = {view.class_slots(element_name, direct=True)}") assert len(view.class_slots(element_name)) == len(view.class_slots(element_name, direct=True)) - assert 'Thing' not in view.all_classes() - assert 'Person' not in view.all_classes() - assert 'Adult' in view.all_classes() + assert "Thing" not in view.all_classes() + assert "Person" not in view.all_classes() + assert "Adult" in view.all_classes() -def test_caching(): - """ - Determine if cache is reset after modifications made to schema - """ - schema = SchemaDefinition(id='test', name='test') + +def test_caching() -> None: + """Determine if cache is reset after modifications made to schema.""" + schema = SchemaDefinition(id="test", name="test") view = SchemaView(schema) assert len([]) == len(view.all_classes()) - view.add_class(ClassDefinition('X')) - assert len(['X']) == len(view.all_classes()) - view.add_class(ClassDefinition('Y')) - assert len(['X', 'Y']) == len(view.all_classes()) + view.add_class(ClassDefinition("X")) + assert len(["X"]) == len(view.all_classes()) + view.add_class(ClassDefinition("Y")) + assert len(["X", "Y"]) == len(view.all_classes()) # bypass view method and add directly to schema; # in general this is not recommended as the cache will # not be updated - view.schema.classes['Z'] = ClassDefinition('Z') + view.schema.classes["Z"] = ClassDefinition("Z") # as expected, the view doesn't know about Z - assert len(['X', 'Y']) == len(view.all_classes()) + assert len(["X", "Y"]) == len(view.all_classes()) # inform the view modifications have been made view.set_modified() # should be in sync - assert len(['X', 'Y', 'Z']) == len(view.all_classes()) + assert len(["X", "Y", "Z"]) == len(view.all_classes()) # recommended way to make updates - view.delete_class('X') + view.delete_class("X") # cache will be up to date - assert len(['Y', 'Z']) == len(view.all_classes()) - view.add_class(ClassDefinition('W')) - assert len(['Y', 'Z', 'W']) == len(view.all_classes()) + assert len(["Y", "Z"]) == len(view.all_classes()) + view.add_class(ClassDefinition("W")) + assert len(["Y", "Z", "W"]) == len(view.all_classes()) -def test_import_map(): - """ - Path to import file should be configurable - """ +def test_import_map() -> None: + """Path to import file should be configurable.""" for im in [{"core": "/no/such/file"}, {"linkml:": "/no/such/file"}]: view = SchemaView(SCHEMA_WITH_IMPORTS, importmap=im) with pytest.raises(FileNotFoundError): @@ -388,30 +421,33 @@ def test_import_map(): for im in [None, {}, {"core": "core"}]: view = SchemaView(SCHEMA_WITH_IMPORTS, importmap=im) view.all_classes() - assert view.imports_closure().sort() == ['kitchen_sink', 'core', 'linkml:types'].sort() # Assert imports closure + assert ( + view.imports_closure().sort() == ["kitchen_sink", "core", "linkml:types"].sort() + ) # Assert imports closure assert ACTIVITY in view.all_classes() # Assert ACTIVITY is in all classes assert ACTIVITY not in view.all_classes(imports=False) # Assert ACTIVITY is not in classes without imports -def test_imports(view): - """view should by default dynamically include imports chain""" - assert (view.schema.source_file is not None) +def test_imports(schema_view_with_imports: SchemaView) -> None: + """View should by default dynamically include imports chain.""" + view = schema_view_with_imports + assert view.schema.source_file is not None logger.debug(view.imports_closure()) - assert set(view.imports_closure()) == {'kitchen_sink', 'core', 'linkml:types'} + assert set(view.imports_closure()) == {"kitchen_sink", "core", "linkml:types"} for t in view.all_types().keys(): - logger.debug(f'T={t} in={view.in_schema(t)}') - assert view.in_schema(ClassDefinitionName('Person')) == 'kitchen_sink' - assert view.in_schema(SlotDefinitionName('id')) == 'core' - assert view.in_schema(SlotDefinitionName('name')) == 'core' - assert view.in_schema(SlotDefinitionName(ACTIVITY)) == 'core' - assert view.in_schema(SlotDefinitionName('string')) == 'types' + logger.debug(f"T={t} in={view.in_schema(t)}") + assert view.in_schema(ClassDefinitionName("Person")) == "kitchen_sink" + assert view.in_schema(SlotDefinitionName("id")) == "core" + assert view.in_schema(SlotDefinitionName("name")) == "core" + assert view.in_schema(SlotDefinitionName(ACTIVITY)) == "core" + assert view.in_schema(SlotDefinitionName("string")) == "types" assert ACTIVITY in view.all_classes() assert ACTIVITY not in view.all_classes(imports=False) - assert 'string' in view.all_types() - assert 'string' not in view.all_types(imports=False) - assert len(view.type_ancestors('SymbolString')) == len(['SymbolString', 'string']) + assert "string" in view.all_types() + assert "string" not in view.all_types(imports=False) + assert len(view.type_ancestors("SymbolString")) == len(["SymbolString", "string"]) for tn, t in view.all_types().items(): assert tn == t.name @@ -419,154 +455,166 @@ def test_imports(view): assert induced_t.uri is not None assert induced_t.base is not None if t in view.all_types(imports=False).values(): - assert t.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + assert t.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" else: - assert t.from_schema in ['https://w3id.org/linkml/tests/core', 'https://w3id.org/linkml/types'] + assert t.from_schema in ["https://w3id.org/linkml/tests/core", "https://w3id.org/linkml/types"] for en, e in view.all_enums().items(): assert en == e.name if e in view.all_enums(imports=False).values(): - assert e.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + assert e.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" else: - assert e.from_schema == 'https://w3id.org/linkml/tests/core' + assert e.from_schema == "https://w3id.org/linkml/tests/core" for sn, s in view.all_slots().items(): assert sn == s.name s_induced = view.induced_slot(sn) assert s_induced.range is not None if s in view.all_slots(imports=False).values(): - assert s.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + assert s.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" else: - assert s.from_schema == 'https://w3id.org/linkml/tests/core' + assert s.from_schema == "https://w3id.org/linkml/tests/core" for cn, c in view.all_classes().items(): assert cn == c.name if c in view.all_classes(imports=False).values(): - assert c.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' + assert c.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" else: - assert c.from_schema == 'https://w3id.org/linkml/tests/core' + assert c.from_schema == "https://w3id.org/linkml/tests/core" for s in view.class_induced_slots(cn): if s in view.all_classes(imports=False).values(): assert s.slot_uri is not None - assert s.from_schema == 'https://w3id.org/linkml/tests/kitchen_sink' - - for c in ['Company', 'Person', 'Organization', 'Thing']: - assert view.induced_slot('id', c).identifier - assert not view.induced_slot('name', c).identifier - assert not view.induced_slot('name', c).required - assert view.induced_slot('name', c).range == 'string' - - for c in ['Event', 'EmploymentEvent', 'MedicalEvent']: - s = view.induced_slot('started at time', c) - assert s.range == 'date' - assert s.slot_uri == 'prov:startedAtTime' - - assert view.induced_slot(AGE_IN_YEARS, 'Person').minimum_value == 0 - assert view.induced_slot(AGE_IN_YEARS, 'Adult').minimum_value == 16 - - assert view.get_class('agent').class_uri == 'prov:Agent' - assert view.get_uri(AGENT) == 'prov:Agent' - logger.debug(view.get_class('Company').class_uri) - - assert view.get_uri(COMPANY) == 'ks:Company' - assert view.get_uri(COMPANY, expand=True) == 'https://w3id.org/linkml/tests/kitchen_sink/Company' - logger.debug(view.get_uri('TestClass')) - assert view.get_uri('TestClass') == 'core:TestClass' - assert view.get_uri('TestClass', expand=True) == 'https://w3id.org/linkml/tests/core/TestClass' - - assert view.get_uri('TestClass', expand=True, use_element_type=True) == 'https://w3id.org/linkml/tests/core/class/TestClass' - assert view.get_uri('TestClass', use_element_type=True) == 'core:class/TestClass' - assert view.get_uri('name', use_element_type=True) == 'core:slot/name' + assert s.from_schema == "https://w3id.org/linkml/tests/kitchen_sink" + + for c in ["Company", "Person", "Organization", "Thing"]: + assert view.induced_slot("id", c).identifier + assert not view.induced_slot("name", c).identifier + assert not view.induced_slot("name", c).required + assert view.induced_slot("name", c).range == "string" + + for c in ["Event", "EmploymentEvent", "MedicalEvent"]: + s = view.induced_slot("started at time", c) + assert s.range == "date" + assert s.slot_uri == "prov:startedAtTime" + + assert view.induced_slot(AGE_IN_YEARS, "Person").minimum_value == 0 + assert view.induced_slot(AGE_IN_YEARS, "Adult").minimum_value == 16 + + assert view.get_class("agent").class_uri == "prov:Agent" + assert view.get_uri(AGENT) == "prov:Agent" + logger.debug(view.get_class("Company").class_uri) + + assert view.get_uri(COMPANY) == "ks:Company" + assert view.get_uri(COMPANY, expand=True) == "https://w3id.org/linkml/tests/kitchen_sink/Company" + logger.debug(view.get_uri("TestClass")) + assert view.get_uri("TestClass") == "core:TestClass" + assert view.get_uri("TestClass", expand=True) == "https://w3id.org/linkml/tests/core/TestClass" + + assert ( + view.get_uri("TestClass", expand=True, use_element_type=True) + == "https://w3id.org/linkml/tests/core/class/TestClass" + ) + assert view.get_uri("TestClass", use_element_type=True) == "core:class/TestClass" + assert view.get_uri("name", use_element_type=True) == "core:slot/name" - assert view.get_uri('string') == 'xsd:string' + assert view.get_uri("string") == "xsd:string" # dynamic enums - e = view.get_enum('HCAExample') - assert set(e.include[0].reachable_from.source_nodes) == {'GO:0007049', 'GO:0022403'} + e = view.get_enum("HCAExample") + assert set(e.include[0].reachable_from.source_nodes) == {"GO:0007049", "GO:0022403"} # units - height = view.get_slot('height_in_m') + height = view.get_slot("height_in_m") assert height.unit.ucum_code == "m" -def test_imports_from_schemaview(view): - """view should by default dynamically include imports chain""" +def test_imports_from_schemaview(schema_view_with_imports: SchemaView) -> None: + """View should by default dynamically include imports chain.""" + view = schema_view_with_imports view2 = SchemaView(view.schema) assert len(view.all_classes()) == len(view2.all_classes()) assert len(view.all_classes(imports=False)) == len(view2.all_classes(imports=False)) -def test_imports_closure_order(): +def test_imports_closure_order() -> None: """Imports should override in a python-like order.""" sv = SchemaView(SCHEMA_IMPORT_TREE) closure = sv.imports_closure(imports=True) target = [ - 'linkml:types', - 's1_1', - 's1_2_1_1_1', 's1_2_1_1_2', - 's1_2_1_1', 's1_2_1', 's1_2', - 's1', - 's2_1', 's2_2', 's2', - 's3_1', 's3_2', 's3', - 'main' + "linkml:types", + "s1_1", + "s1_2_1_1_1", + "s1_2_1_1_2", + "s1_2_1_1", + "s1_2_1", + "s1_2", + "s1", + "s2_1", + "s2_2", + "s2", + "s3_1", + "s3_2", + "s3", + "main", ] assert closure == target -def test_imports_overrides(): +def test_imports_overrides() -> None: """Classes defined in the importing module should override same-named classes in imported modules.""" sv = SchemaView(SCHEMA_IMPORT_TREE) defaults = {} target = {} for name, cls in sv.all_classes(imports=True).items(): target[name] = name - defaults[name] = cls.attributes['value'].ifabsent + defaults[name] = cls.attributes["value"].ifabsent assert defaults == target -def test_imports_relative(): +def test_imports_relative() -> None: """Relative imports from relative imports should evaluate relative to the *importing* schema.""" sv = SchemaView(SCHEMA_RELATIVE_IMPORT_TREE) closure = sv.imports_closure(imports=True) assert len(closure) == len(sv.schema_map.keys()) assert closure == [ - 'linkml:types', - '../neighborhood_parent', - 'neighbor', - '../parent', - '../L1_0_1/L2_0_1_0/grandchild', - '../../L0_1/L1_1_0/L2_1_0_0/apple', - '../../L0_1/L1_1_0/L2_1_0_0/index', - '../../L0_1/L1_1_0/L2_1_0_1/banana', - '../../L0_1/L1_1_0/L2_1_0_1/index', - '../../L0_1/L1_1_0/index', - '../../L0_1/cousin', - '../L1_0_1/dupe', - './L2_0_0_0/child', - './L2_0_0_1/child', - 'L2_0_0_2/two', - 'L2_0_0_2/one', - 'L2_0_0_2/four', - 'L2_0_0_2/three', - 'L2_0_0_2/stepchild', - 'main' + "linkml:types", + "../neighborhood_parent", + "neighbor", + "../parent", + "../L1_0_1/L2_0_1_0/grandchild", + "../../L0_1/L1_1_0/L2_1_0_0/apple", + "../../L0_1/L1_1_0/L2_1_0_0/index", + "../../L0_1/L1_1_0/L2_1_0_1/banana", + "../../L0_1/L1_1_0/L2_1_0_1/index", + "../../L0_1/L1_1_0/index", + "../../L0_1/cousin", + "../L1_0_1/dupe", + "./L2_0_0_0/child", + "./L2_0_0_1/child", + "L2_0_0_2/two", + "L2_0_0_2/one", + "L2_0_0_2/four", + "L2_0_0_2/three", + "L2_0_0_2/stepchild", + "main", ] # check that we can actually get the classes from the same-named schema classes = sv.all_classes(imports=True) - assert 'L110Index' in classes - assert 'L2100Index' in classes - assert 'L2101Index' in classes + assert "L110Index" in classes + assert "L2100Index" in classes + assert "L2101Index" in classes -def test_imports_relative_load(): + +def test_imports_relative_load() -> None: """Relative imports from relative imports should load without FileNotFoundError.""" sv = SchemaView(SCHEMA_RELATIVE_IMPORT_TREE2) sv.imports_closure(imports=True) -def test_direct_remote_imports(): +def test_direct_remote_imports() -> None: """Tests that building a SchemaView directly from a remote URL works.""" view = SchemaView("https://w3id.org/linkml/meta.yaml") main_classes = ["class_definition", "prefix"] @@ -580,20 +628,18 @@ def test_direct_remote_imports(): @pytest.mark.skip("Skipped as fragile: will break if the remote schema changes") -def test_direct_remote_imports_additional(): - """ - Alternative test to: https://github.com/linkml/linkml/pull/1379 - """ +def test_direct_remote_imports_additional() -> None: + """Alternative test to: https://github.com/linkml/linkml/pull/1379.""" url = "https://raw.githubusercontent.com/GenomicsStandardsConsortium/mixs/main/model/schema/mixs.yaml" view = SchemaView(url) assert view.schema.name == "MIxS" class_count = len(view.all_classes()) assert class_count > 0 -def test_merge_imports(view): - """ - ensure merging and merging imports closure works - """ + +def test_merge_imports(schema_view_with_imports: SchemaView) -> None: + """Ensure merging and merging imports closure works.""" + view = schema_view_with_imports all_c = copy(view.all_classes()) all_c_noi = copy(view.all_classes(imports=False)) assert len(all_c_noi) < len(all_c) @@ -603,15 +649,15 @@ def test_merge_imports(view): all_c2_noi = copy(view.all_classes(imports=False)) assert len(all_c2_noi) == len(all_c2) -def test_metamodel_imports(): - """ - Tests imports of the metamodel. + +def test_metamodel_imports() -> None: + """Tests imports of the metamodel. Note: this test and others should be able to run without network connectivity. SchemaView should make use of the version of the metamodel distributed with the package over the network available version. """ - schema = SchemaDefinition(id='test', name='metamodel-imports-test', imports=["linkml:meta"]) + schema = SchemaDefinition(id="test", name="metamodel-imports-test", imports=["linkml:meta"]) sv = SchemaView(schema) all_classes = sv.all_classes() assert len(all_classes) > 20 @@ -620,147 +666,136 @@ def test_metamodel_imports(): assert len(sv.all_classes()) > 20 assert all_classes == sv.all_classes() -def test_non_linkml_remote_import(): - """ - Test that a remote import _not_ using the linkml prefix works. - See: https://github.com/linkml/linkml/issues/1627 + +def test_non_linkml_remote_import() -> None: + """Test that a remote import _not_ using the linkml prefix works. + + See: https://github.com/linkml/linkml/issues/1627. """ schema = SchemaDefinition( - id='test_non_linkml_remote_import', - name='test_non_linkml_remote_import', - prefixes=[ - Prefix( - prefix_prefix="foo", - prefix_reference="https://w3id.org/linkml/" - ) - ], - imports=[ - "foo:types" - ], - slots=[ - SlotDefinition( - name="an_int", - range="integer" - ) - ], - classes=[ - ClassDefinition( - name="AClass", - slots=["an_int"] - ) - ] + id="test_non_linkml_remote_import", + name="test_non_linkml_remote_import", + prefixes=[Prefix(prefix_prefix="foo", prefix_reference="https://w3id.org/linkml/")], + imports=["foo:types"], + slots=[SlotDefinition(name="an_int", range="integer")], + classes=[ClassDefinition(name="AClass", slots=["an_int"])], ) sv = SchemaView(schema) slots = sv.class_induced_slots("AClass", imports=True) assert len(slots) == 1 -def test_traversal(): - schema = SchemaDefinition(id='test', name='traversal-test') + +def test_traversal() -> None: + """Test schema traversal.""" + schema = SchemaDefinition(id="test", name="traversal-test") view = SchemaView(schema) - view.add_class(ClassDefinition('Root', mixins=['RootMixin'])) - view.add_class(ClassDefinition('A', is_a='Root', mixins=['Am1', 'Am2', 'AZ'])) - view.add_class(ClassDefinition('B', is_a='A', mixins=['Bm1', 'Bm2', 'BY'])) - view.add_class(ClassDefinition('C', is_a='B', mixins=['Cm1', 'Cm2', 'CX'])) - view.add_class(ClassDefinition('RootMixin', mixin=True)) - view.add_class(ClassDefinition('Am1', is_a='RootMixin', mixin=True)) - view.add_class(ClassDefinition('Am2', is_a='RootMixin', mixin=True)) - view.add_class(ClassDefinition('Bm1', is_a='Am1', mixin=True)) - view.add_class(ClassDefinition('Bm2', is_a='Am2', mixin=True)) - view.add_class(ClassDefinition('Cm1', is_a='Bm1', mixin=True)) - view.add_class(ClassDefinition('Cm2', is_a='Bm2', mixin=True)) - view.add_class(ClassDefinition('AZ', is_a='RootMixin', mixin=True)) - view.add_class(ClassDefinition('BY', is_a='RootMixin', mixin=True)) - view.add_class(ClassDefinition('CX', is_a='RootMixin', mixin=True)) + view.add_class(ClassDefinition("Root", mixins=["RootMixin"])) + view.add_class(ClassDefinition("A", is_a="Root", mixins=["Am1", "Am2", "AZ"])) + view.add_class(ClassDefinition("B", is_a="A", mixins=["Bm1", "Bm2", "BY"])) + view.add_class(ClassDefinition("C", is_a="B", mixins=["Cm1", "Cm2", "CX"])) + view.add_class(ClassDefinition("RootMixin", mixin=True)) + view.add_class(ClassDefinition("Am1", is_a="RootMixin", mixin=True)) + view.add_class(ClassDefinition("Am2", is_a="RootMixin", mixin=True)) + view.add_class(ClassDefinition("Bm1", is_a="Am1", mixin=True)) + view.add_class(ClassDefinition("Bm2", is_a="Am2", mixin=True)) + view.add_class(ClassDefinition("Cm1", is_a="Bm1", mixin=True)) + view.add_class(ClassDefinition("Cm2", is_a="Bm2", mixin=True)) + view.add_class(ClassDefinition("AZ", is_a="RootMixin", mixin=True)) + view.add_class(ClassDefinition("BY", is_a="RootMixin", mixin=True)) + view.add_class(ClassDefinition("CX", is_a="RootMixin", mixin=True)) def check(ancs, expected): assert ancs == expected - check(view.class_ancestors('C', depth_first=True), - ['C', 'Cm1', 'Cm2', 'CX', 'B', 'Bm1', 'Bm2', 'BY', 'A', 'Am1', 'Am2', 'AZ', 'Root', 'RootMixin']) - check(view.class_ancestors('C', depth_first=False), - ['C', 'Cm1', 'Cm2', 'CX', 'B', 'Bm1', 'Bm2', 'RootMixin', 'BY', 'A', 'Am1', 'Am2', 'AZ', 'Root']) - check(view.class_ancestors('C', mixins=False), - ['C', 'B', 'A', 'Root']) - check(view.class_ancestors('C', is_a=False), - ['C', 'Cm1', 'Cm2', 'CX']) - -def test_slot_inheritance(): - schema = SchemaDefinition(id='test', name='test') + check( + view.class_ancestors("C", depth_first=True), + ["C", "Cm1", "Cm2", "CX", "B", "Bm1", "Bm2", "BY", "A", "Am1", "Am2", "AZ", "Root", "RootMixin"], + ) + check( + view.class_ancestors("C", depth_first=False), + ["C", "Cm1", "Cm2", "CX", "B", "Bm1", "Bm2", "RootMixin", "BY", "A", "Am1", "Am2", "AZ", "Root"], + ) + check(view.class_ancestors("C", mixins=False), ["C", "B", "A", "Root"]) + check(view.class_ancestors("C", is_a=False), ["C", "Cm1", "Cm2", "CX"]) + + +def test_slot_inheritance() -> None: + """Test slot inheritance.""" + schema = SchemaDefinition(id="test", name="test") view = SchemaView(schema) - view.add_class(ClassDefinition('C', slots=['s1', 's2'])) - view.add_class(ClassDefinition('D')) - view.add_class(ClassDefinition('Z')) - view.add_class(ClassDefinition('W')) - - view.add_slot(SlotDefinition('s1', multivalued=True, range='D')) - view.add_slot(SlotDefinition('s2', is_a='s1')) - view.add_slot(SlotDefinition('s3', is_a='s2', mixins=['m1'])) - view.add_slot(SlotDefinition('s4', is_a='s2', mixins=['m1'], range='W')) - view.add_slot(SlotDefinition('m1', mixin=True, multivalued=False, range='Z')) - - slot1 = view.induced_slot('s1', 'C') + view.add_class(ClassDefinition("C", slots=["s1", "s2"])) + view.add_class(ClassDefinition("D")) + view.add_class(ClassDefinition("Z")) + view.add_class(ClassDefinition("W")) + + view.add_slot(SlotDefinition("s1", multivalued=True, range="D")) + view.add_slot(SlotDefinition("s2", is_a="s1")) + view.add_slot(SlotDefinition("s3", is_a="s2", mixins=["m1"])) + view.add_slot(SlotDefinition("s4", is_a="s2", mixins=["m1"], range="W")) + view.add_slot(SlotDefinition("m1", mixin=True, multivalued=False, range="Z")) + + slot1 = view.induced_slot("s1", "C") assert slot1.is_a is None - assert slot1.range == 'D' + assert slot1.range == "D" assert slot1.multivalued is not None - slot2 = view.induced_slot('s2', 'C') - assert slot2.is_a == 's1' - assert slot2.range == 'D' + slot2 = view.induced_slot("s2", "C") + assert slot2.is_a == "s1" + assert slot2.range == "D" assert slot2.multivalued is not None - slot3 = view.induced_slot('s3', 'C') + slot3 = view.induced_slot("s3", "C") assert slot3.multivalued is not None - assert slot3.range == 'Z' + assert slot3.range == "Z" - slot4 = view.induced_slot('s4', 'C') + slot4 = view.induced_slot("s4", "C") assert slot4.multivalued is not None - assert slot4.range == 'W' + assert slot4.range == "W" # Test dangling - view.add_slot(SlotDefinition('s5', is_a='does-not-exist')) + view.add_slot(SlotDefinition("s5", is_a="does-not-exist")) with pytest.raises(ValueError): - view.slot_ancestors('s5') - - -def test_attribute_inheritance(): - """ - Tests attribute inheritance edge cases. - """ - view = SchemaView(os.path.join(INPUT_DIR, 'attribute_edge_cases.yaml')) - expected = [ - ('Root', 'a1', None, "a1"), - ('Root', 'a2', None, "a2"), - ('Root', 'a3', None, "a3"), - ('C1', 'a1', True, "a1m1"), - ('C1', 'a2', True, "a2c1"), - ('C1', 'a3', None, "a3"), - ('C1', 'a4', None, "a4"), - ('C2', 'a1', False, "a1m2"), - ('C2', 'a2', True, "a2c2"), - ('C2', 'a3', None, "a3"), - ('C2', 'a4', True, "a4m2"), - ('C1x', 'a1', True, "a1m1"), - ('C1x', 'a2', True, "a2c1x"), - ('C1x', 'a3', None, "a3"), - ('C1x', 'a4', None, "a4"), - ] - for cn, sn, req, desc in expected: - slot = view.induced_slot(sn, cn) - assert req == slot.required, f"in: {cn}.{sn}" - assert desc == slot.description, f"in: {cn}.{sn}" - assert slot.range == 'string', f"in: {cn}.{sn}" - - -def test_ambiguous_attributes(): - schema = SchemaDefinition(id='test', name='test') + view.slot_ancestors("s5") + + +@pytest.mark.parametrize( + ("cn", "sn", "req", "desc"), + [ + ("Root", "a1", None, "a1"), + ("Root", "a2", None, "a2"), + ("Root", "a3", None, "a3"), + ("C1", "a1", True, "a1m1"), + ("C1", "a2", True, "a2c1"), + ("C1", "a3", None, "a3"), + ("C1", "a4", None, "a4"), + ("C2", "a1", False, "a1m2"), + ("C2", "a2", True, "a2c2"), + ("C2", "a3", None, "a3"), + ("C2", "a4", True, "a4m2"), + ("C1x", "a1", True, "a1m1"), + ("C1x", "a2", True, "a2c1x"), + ("C1x", "a3", None, "a3"), + ("C1x", "a4", None, "a4"), + ], +) +def test_attribute_inheritance(schema_view_attributes: SchemaView, cn: str, sn: str, req: bool, desc: str) -> None: + """Tests attribute inheritance edge cases.""" + slot = schema_view_attributes.induced_slot(sn, cn) + assert req == slot.required, f"in: {cn}.{sn}" + assert desc == slot.description, f"in: {cn}.{sn}" + assert slot.range == "string", f"in: {cn}.{sn}" + + +def test_ambiguous_attributes() -> None: + schema = SchemaDefinition(id="test", name="test") view = SchemaView(schema) - a1 = SlotDefinition('a1', range='string') - a2 = SlotDefinition('a2', range='FooEnum') - a3 = SlotDefinition('a3', range='C3') - view.add_class(ClassDefinition('C1', attributes={a1.name: a1, a2.name: a2, a3.name: a3})) - a1x = SlotDefinition('a1', range='integer') - a2x = SlotDefinition('a2', range='BarEnum') - view.add_class(ClassDefinition('C2', attributes={a1x.name: a1x, a2x.name: a2x})) + a1 = SlotDefinition("a1", range="string") + a2 = SlotDefinition("a2", range="FooEnum") + a3 = SlotDefinition("a3", range="C3") + view.add_class(ClassDefinition("C1", attributes={a1.name: a1, a2.name: a2, a3.name: a3})) + a1x = SlotDefinition("a1", range="integer") + a2x = SlotDefinition("a2", range="BarEnum") + view.add_class(ClassDefinition("C2", attributes={a1x.name: a1x, a2x.name: a2x})) assert view.get_slot(a1.name).range is None assert view.get_slot(a2.name).range is None @@ -769,46 +804,49 @@ def test_ambiguous_attributes(): assert len(view.all_slots(attributes=False)) == 0 assert len(view.all_slots()) == 3 assert view.induced_slot(a3.name).range == a3.range - assert view.induced_slot(a1.name, 'C1').range == a1.range - assert view.induced_slot(a2.name, 'C1').range == a2.range - assert view.induced_slot(a1x.name, 'C2').range == a1x.range - assert view.induced_slot(a2x.name, 'C2').range == a2x.range - - -def test_metamodel_in_schemaview(): - view = package_schemaview('linkml_runtime.linkml_model.meta') - assert 'meta' in view.imports_closure() - assert 'linkml:types' in view.imports_closure() - assert 'meta' in view.imports_closure(imports=False) - assert 'linkml:types' not in view.imports_closure(imports=False) + assert view.induced_slot(a1.name, "C1").range == a1.range + assert view.induced_slot(a2.name, "C1").range == a2.range + assert view.induced_slot(a1x.name, "C2").range == a1x.range + assert view.induced_slot(a2x.name, "C2").range == a2x.range + + +def test_metamodel_in_schemaview() -> None: + """Test using SchemaView with the metamodel.""" + view = package_schemaview("linkml_runtime.linkml_model.meta") + assert "meta" in view.imports_closure() + assert "linkml:types" in view.imports_closure() + assert "meta" in view.imports_closure(imports=False) + assert "linkml:types" not in view.imports_closure(imports=False) assert len(view.imports_closure(imports=False)) == 1 all_classes = list(view.all_classes().keys()) all_classes_no_imports = list(view.all_classes(imports=False).keys()) - for cn in ['class_definition', 'type_definition', 'slot_definition']: + for cn in ["class_definition", "type_definition", "slot_definition"]: assert cn in all_classes assert cn in all_classes_no_imports - assert view.get_identifier_slot(cn).name == 'name' - for cn in ['annotation', 'extension']: + assert view.get_identifier_slot(cn).name == "name" + for cn in ["annotation", "extension"]: assert cn in all_classes, "imports should be included by default" assert cn not in all_classes_no_imports, "imported class unexpectedly included" - for sn in ['id', 'name', 'description']: + for sn in ["id", "name", "description"]: assert sn in view.all_slots() - for tn in ['uriorcurie', 'string', 'float']: + for tn in ["uriorcurie", "string", "float"]: assert tn in view.all_types() - for tn in ['uriorcurie', 'string', 'float']: + for tn in ["uriorcurie", "string", "float"]: assert tn not in view.all_types(imports=False) for cn, c in view.all_classes().items(): uri = view.get_uri(cn, expand=True) assert uri is not None - if cn not in ['structured_alias', 'UnitOfMeasure', 'ValidationReport', 'ValidationResult']: - assert 'https://w3id.org/linkml/' in uri + if cn not in ["structured_alias", "UnitOfMeasure", "ValidationReport", "ValidationResult"]: + assert "https://w3id.org/linkml/" in uri induced_slots = view.class_induced_slots(cn) for s in induced_slots: exp_slot_uri = view.get_uri(s, expand=True) assert exp_slot_uri is not None -def test_get_classes_by_slot(view): +def test_get_classes_by_slot(schema_view_with_imports: SchemaView) -> None: + """Test getting classes by slot.""" + view = schema_view_with_imports slot = view.get_slot(AGE_IN_YEARS) actual_result = view.get_classes_by_slot(slot) expected_result = ["Person"] @@ -819,7 +857,8 @@ def test_get_classes_by_slot(view): assert sorted(actual_result) == sorted(expected_result) -def test_materialize_patterns(): +def test_materialize_patterns() -> None: + """Test pattern materialization.""" sv = SchemaView(SCHEMA_WITH_STRUCTURED_PATTERNS) sv.materialize_patterns() @@ -830,23 +869,26 @@ def test_materialize_patterns(): assert weight_slot.pattern == r"\d+[\.\d+] (kg|g|lbs|stone)" -def test_materialize_patterns_slot_usage(): +def test_materialize_patterns_slot_usage() -> None: + """Test pattern materialization with slot_usage.""" sv = SchemaView(SCHEMA_WITH_STRUCTURED_PATTERNS) sv.materialize_patterns() - name_slot_usage = sv.get_class("FancyPersonInfo").slot_usage['name'] + name_slot_usage = sv.get_class("FancyPersonInfo").slot_usage["name"] assert name_slot_usage.pattern == r"\S+ \S+-\S+" -def test_materialize_patterns_attribute(): +def test_materialize_patterns_attribute() -> None: + """Test pattern materialization with attributes.""" sv = SchemaView(SCHEMA_WITH_STRUCTURED_PATTERNS) sv.materialize_patterns() - weight_attribute = sv.get_class('ClassWithAttributes').attributes['weight'] + weight_attribute = sv.get_class("ClassWithAttributes").attributes["weight"] assert weight_attribute.pattern == r"\d+[\.\d+] (kg|g|lbs|stone)" -def test_mergeimports(): +def test_mergeimports() -> None: + """Ensure that imports are or are not merged, depending on the kwargs.""" # note the change here to include an extra param not in the fixture sv = SchemaView(SCHEMA_WITH_IMPORTS, merge_imports=False) classes_list = list(sv.schema.classes.keys()) @@ -866,16 +908,16 @@ def test_mergeimports(): assert "was generated by" in slots_list prefixes_list = list(sv.schema.prefixes.keys()) - if 'schema' not in prefixes_list: - prefixes_list.append('schema') + if "schema" not in prefixes_list: + prefixes_list.append("schema") assert sorted(prefixes_list) == sorted( - ["pav", "dce", "lego", "linkml", "biolink", "ks", "RO", "BFO", "tax", "core", "prov", "xsd", "schema", "shex"]) + ["pav", "dce", "lego", "linkml", "biolink", "ks", "RO", "BFO", "tax", "core", "prov", "xsd", "schema", "shex"] + ) -def test_is_inlined(): - schema_path = os.path.join(INPUT_DIR, "schemaview_is_inlined.yaml") - sv = SchemaView(schema_path) - cases = [ +@pytest.mark.parametrize( + ("slot_name", "expected_result"), + [ ("a_thing_with_id", False), ("inlined_thing_with_id", True), ("inlined_as_list_thing_with_id", True), @@ -884,14 +926,18 @@ def test_is_inlined(): ("inlined_as_list_thing_without_id", True), ("an_integer", False), ("inlined_integer", False), - ("inlined_as_list_integer", False) - ] - for slot_name, expected_result in cases: - slot = sv.get_slot(slot_name) - assert sv.is_inlined(slot) == expected_result + ("inlined_as_list_integer", False), + ], +) +def test_is_inlined(slot_name: str, expected_result: bool) -> None: + """Tests for slots being inlined or not.""" + schema_path = os.path.join(INPUT_DIR, "schemaview_is_inlined.yaml") + sv = SchemaView(schema_path) + slot = sv.get_slot(slot_name) + assert sv.is_inlined(slot) == expected_result -def test_materialize_nonscalar_slot_usage(): +def test_materialize_nonscalar_slot_usage() -> None: schema_path = os.path.join(INPUT_DIR, "DJ_controller_schema.yaml") sv = SchemaView(schema_path) cls = sv.induced_class("DJController") @@ -909,19 +955,20 @@ def test_materialize_nonscalar_slot_usage(): assert cls.attributes["jog_wheels"].annotations.expected_value.value == "an integer between 0 and 4" assert cls.attributes["volume_faders"].annotations.expected_value.value == "an integer between 0 and 8" - assert cls.attributes["tempo"].examples == [Example(value='120.0'), Example(value='144.0'), Example(value='126.8'), - Example(value='102.6')] + assert cls.attributes["tempo"].examples == [ + Example(value="120.0"), + Example(value="144.0"), + Example(value="126.8"), + Example(value="102.6"), + ] assert cls.attributes["tempo"].annotations.expected_value.value == "a number between 0 and 200" assert cls.attributes["tempo"].annotations.preferred_unit.value == "BPM" assert cls.attributes["tempo"].domain_of == ["DJController"] assert cls.slot_usage["tempo"].domain_of == [] -def test_type_and_slot_with_same_name(): - """ - Test that checks the case where a URI needs to be resolved and a name is ambiguously used for a slot and a - type - """ +def test_type_and_slot_with_same_name() -> None: + """Test that checks the case where a URI needs to be resolved and a name is ambiguously used for a slot and a type.""" schema_definition = SchemaDefinition(id="https://example.org/test#", name="test_schema", default_prefix="ex") view = SchemaView(schema_definition) @@ -931,9 +978,10 @@ def test_type_and_slot_with_same_name(): assert view.get_uri("test", imports=True) == "ex:test" -def test_uris_without_default_prefix(): - """ - Test if uri is correct if no default_prefix is defined for the schema. Issue: linkml/linkml#2578 +def test_uris_without_default_prefix() -> None: + """Test if uri is correct if no default_prefix is defined for the schema. + + See: https://github.com/linkml/linkml/issues/2578 """ schema_definition = SchemaDefinition(id="https://example.org/test#", name="test_schema")