Skip to content
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ update_model:

test:
poetry run python -m unittest discover
poetry run pytest


# temporary measure until linkml-model is synced
linkml_runtime/processing/validation_datamodel.py: linkml_runtime/processing/validation_datamodel.yaml
Expand Down
139 changes: 121 additions & 18 deletions linkml_runtime/utils/schemaview.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import uuid
import logging
import collections
from dataclasses import fields
from functools import lru_cache
from copy import copy, deepcopy
from collections import defaultdict, deque
from pathlib import Path
from pprint import pprint
from typing import Mapping, Optional, Tuple, TypeVar
import warnings

Expand All @@ -17,6 +19,8 @@
from linkml_runtime.linkml_model.meta import *
from linkml_runtime.exceptions import OrderingError
from enum import Enum
from linkml_runtime.linkml_model.meta import ClassDefinition, SlotDefinition, ClassDefinitionName
from dataclasses import asdict, is_dataclass, fields

logger = logging.getLogger(__name__)

Expand All @@ -36,9 +40,9 @@
ENUM_NAME = Union[EnumDefinitionName, str]

ElementType = TypeVar("ElementType", bound=Element)
ElementNameType = TypeVar("ElementNameType", bound=Union[ElementName,str])
ElementNameType = TypeVar("ElementNameType", bound=Union[ElementName, str])
DefinitionType = TypeVar("DefinitionType", bound=Definition)
DefinitionNameType = TypeVar("DefinitionNameType", bound=Union[DefinitionName,str])
DefinitionNameType = TypeVar("DefinitionNameType", bound=Union[DefinitionName, str])
ElementDict = Dict[ElementNameType, ElementType]
DefDict = Dict[DefinitionNameType, DefinitionType]

Expand All @@ -53,7 +57,6 @@
"""



def _closure(f, x, reflexive=True, depth_first=True, **kwargs):
if reflexive:
rv = [x]
Expand All @@ -77,14 +80,48 @@
return rv


def to_dict(obj):
"""
Convert a LinkML element (such as ClassDefinition) to a dictionary.
:param obj: The LinkML class instance to convert.
:return: A dictionary representation of the class.
"""
if is_dataclass(obj):
return asdict(obj)
elif isinstance(obj, list):
return [to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {key: to_dict(value) for key, value in obj.items()}
else:
return obj

Check warning on line 96 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L96

Added line #L96 was not covered by tests


def get_anonymous_class_definition(class_as_dict: ClassDefinition) -> AnonymousClassExpression:
"""
Convert a ClassDefinition to an AnonymousClassExpression, typically for use in defining an Expression object
(e.g. SlotDefinition.range_expression). This method only fills out the fields that are present in the
AnonymousClassExpression class. #TODO: We should consider whether an Expression should share a common ancestor with
the Definition classes.
:param class_as_dict: The ClassDefinition to convert.
:return: An AnonymousClassExpression.
"""
an_expr = AnonymousClassExpression()
valid_fields = {field.name for field in fields(an_expr)}
for k, v in class_as_dict.items():
if k in valid_fields:
setattr(an_expr, k, v)
for k, v in class_as_dict.items():
setattr(an_expr, k, v)
return an_expr

def load_schema_wrap(path: str, **kwargs):
# import here to avoid circular imports
from linkml_runtime.loaders.yaml_loader import YAMLLoader
yaml_loader = YAMLLoader()
schema: SchemaDefinition
schema = yaml_loader.load(path, target_class=SchemaDefinition, **kwargs)
if "\n" not in path:
# if "\n" not in path and "://" not in path:
# if "\n" not in path and "://" not in path:
# only set path if the input is not a yaml string or URL.
# Setting the source path is necessary for relative imports;
# while initializing a schema with a yaml string is possible, there
Expand Down Expand Up @@ -229,7 +266,8 @@
return schema

@lru_cache(None)
def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, inject_metadata=True) -> List[SchemaDefinitionName]:
def imports_closure(self, imports: bool = True, traverse: Optional[bool] = None, inject_metadata=True) -> List[
SchemaDefinitionName]:
"""
Return all imports

Expand Down Expand Up @@ -314,7 +352,7 @@
visited.add(sn)

# filter duplicates, keeping first entry
closure = list({k:None for k in closure}.keys())
closure = list({k: None for k in closure}.keys())

if inject_metadata:
for s in self.schema_map.values():
Expand Down Expand Up @@ -420,7 +458,6 @@

return {s.name: s for s in slist}


@lru_cache(None)
def all_classes(self, ordered_by=OrderedBy.PRESERVE, imports=True) -> Dict[ClassDefinitionName, ClassDefinition]:
"""
Expand Down Expand Up @@ -865,15 +902,14 @@

@lru_cache(None)
def permissible_value_descendants(self, permissible_value_text: str,
enum_name: ENUM_NAME,
reflexive=True,
depth_first=True) -> List[str]:
enum_name: ENUM_NAME,
reflexive=True,
depth_first=True) -> List[str]:
"""
Closure of permissible_value_children method
:enum
"""


return _closure(lambda x: self.permissible_value_children(x, enum_name),
permissible_value_text,
reflexive=reflexive,
Expand Down Expand Up @@ -1319,6 +1355,7 @@
slots_nr.append(s)
return slots_nr


@lru_cache(None)
def induced_slot(self, slot_name: SLOT_NAME, class_name: CLASS_NAME = None, imports=True,
mangle_name=False) -> SlotDefinition:
Expand All @@ -1332,6 +1369,7 @@
:param slot_name: slot to be queries
:param class_name: class used as context
:param imports: include imports closure
:param mangle_name: if True, the slot name will be mangled to include the class name
:return: dynamic slot constructed by inference
"""
if class_name:
Expand Down Expand Up @@ -1382,11 +1420,46 @@
propagated_from = []
else:
propagated_from = self.class_ancestors(class_name, reflexive=True, mixins=True)

for an in reversed(propagated_from):
induced_slot.owner = an
a = self.get_class(an, imports)
a = self.get_element(an, imports)
# slot usage of the slot in the ancestor class, last ancestor iterated through here is "self"
# so that self.slot_usage overrides ancestor slot_usage at the conclusion of the loop.
anc_slot_usage = a.slot_usage.get(slot_name, {})
# slot name in the ancestor class
# getattr(x, 'y') is equivalent to x.y. None here means raise an error if x.y is not found
v2 = getattr(anc_slot_usage, metaslot_name, None)
# v2 is the value of the metaslot in slot_usage in the ancestor class, which in the loop, means that
# the class itself is the last slot_usage to be considered and applied.
if metaslot_name in ["any_of", "exactly_one_of"]:
if anc_slot_usage != {}:
for ao in anc_slot_usage.any_of:
ao_acd = None
if ao.range is not None:
ao_range = self.get_element(ao.range)
if ao_range:
ao_acd = get_anonymous_class_definition(to_dict(ao_range))
if induced_slot.range_expression is None:
induced_slot.range_expression = AnonymousClassExpression()
if induced_slot.range_expression.any_of is None:
induced_slot.range_expression.any_of = []

Check warning on line 1446 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1446

Added line #L1446 was not covered by tests
# Check for duplicates before appending
if ao_acd is not None and ao_acd not in induced_slot.range_expression.any_of:
induced_slot.range_expression.any_of.append(ao_acd)
for eoo in anc_slot_usage.exactly_one_of:
eoo_acd = None

Check warning on line 1451 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1451

Added line #L1451 was not covered by tests
if eoo.range is not None:
eoo_range = self.get_class(eoo.range)

Check warning on line 1453 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1453

Added line #L1453 was not covered by tests
if eoo_range is not None:
eoo_acd = get_anonymous_class_definition(as_dict(eoo_range))

Check warning on line 1455 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1455

Added line #L1455 was not covered by tests
if induced_slot.range_expression is None:
induced_slot.range_expression = AnonymousClassExpression()

Check warning on line 1457 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1457

Added line #L1457 was not covered by tests
if induced_slot.range_expression.exactly_one_of is None:
induced_slot.range_expression.exactly_one_of = []

Check warning on line 1459 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1459

Added line #L1459 was not covered by tests
# Check for duplicates before appending
if eoo_acd is not None and eoo_acd not in induced_slot.range_expression.exactly_one_of:
induced_slot.range_expression.exactly_one_of.append(eoo_acd)

Check warning on line 1462 in linkml_runtime/utils/schemaview.py

View check run for this annotation

Codecov / codecov/patch

linkml_runtime/utils/schemaview.py#L1462

Added line #L1462 was not covered by tests
if v is None:
v = v2
else:
Expand All @@ -1396,9 +1469,9 @@
else:
# can rewrite below as:
# 1. if v2:
# 2. if v2 is not None and
# 2. if v2 is not None and
# (
# (isinstance(v2, (dict, list)) and v2) or
# (isinstance(v2, (dict, list)) and v2) or
# (isinstance(v2, JsonObj) and as_dict(v2))
# )
if not is_empty(v2):
Expand All @@ -1422,6 +1495,36 @@
if induced_slot.name in c.slots or induced_slot.name in c.attributes:
if c.name not in induced_slot.domain_of:
induced_slot.domain_of.append(c.name)
if induced_slot.range is not None:
pprint(induced_slot)
if induced_slot.range_expression is None:
induced_slot.range_expression = AnonymousClassExpression()
induced_slot.range_expression.any_of = []
range_class = self.get_class(induced_slot.range)
if range_class is not None:
induced_slot.range_expression.any_of.append(
get_anonymous_class_definition(to_dict(range_class))
)
return induced_slot
else:
any_of_ancestors = []
if induced_slot.range_expression.any_of is not None:
for ao_range in induced_slot.range_expression.any_of:
ao_range_class = self.get_class(ao_range.name)
if ao_range_class is not None:
ao_anc = self.class_ancestors(ao_range_class.name)
for a in ao_anc:
if a not in any_of_ancestors:
any_of_ancestors.append(a)
if induced_slot.range in any_of_ancestors:
return induced_slot
else:
range_class = self.get_class(induced_slot.range)
if range_class is not None:
induced_slot.range_expression.any_of.append(
get_anonymous_class_definition(to_dict(range_class))
)
return induced_slot
return induced_slot

@lru_cache(None)
Expand Down Expand Up @@ -1548,7 +1651,7 @@
return True
elif slot.inlined_as_list:
return True

id_slot = self.get_identifier_slot(range, imports=imports)
if id_slot is None:
# must be inlined as has no identifier
Expand Down Expand Up @@ -1592,7 +1695,7 @@
"""
Returns all applicable ranges for a slot

Typically any given slot has exactly one range, and one metamodel element type,
Typically, any given slot has exactly one range, and one metamodel element type,
but a proposed feature in LinkML 1.2 is range expressions, where ranges can be defined as unions

:param slot:
Expand All @@ -1604,9 +1707,9 @@
if x.range:
range_union_of.append(x.range)
return range_union_of

def get_classes_by_slot(
self, slot: SlotDefinition, include_induced: bool = False
self, slot: SlotDefinition, include_induced: bool = False
) -> List[ClassDefinitionName]:
"""Get all classes that use a given slot, either as a direct or induced slot.

Expand Down
3 changes: 3 additions & 0 deletions tests/test_utils/input/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ types:
SymbolString:
typeof: string

NarrativeText:
typeof: string

classes:

activity:
Expand Down
45 changes: 45 additions & 0 deletions tests/test_utils/input/kitchen_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,49 @@ classes:
- id
- name

Alien:
is_a: Thing
slots:
- height_in_m
- type
- related to
slot_usage:
type:
range: OrganismType
required: true
related to:
any_of:
- range: Person
- range: Alien
range: FamilialRelationship
id_prefixes: [ ks ]

Martian:
is_a: Thing
slots:
- height_in_m
- type
- related to
slot_usage:
type:
range: OrganismType
required: true
related to:
range: Alien
required: true

Venetian:
is_a: Thing
slots:
- height_in_m
- type
- related to
slot_usage:
related to:
any_of:
- range: Person
- range: Alien

Person:
is_a: Thing
in_subset:
Expand Down Expand Up @@ -225,6 +268,8 @@ slots:
- subset B
related to:
range: Thing
associated with:
range: Thing
type:
range: string
street:
Expand Down
Loading
Loading