|
1 |
| -from typing import Dict, List |
| 1 | +import json |
| 2 | +import copy |
| 3 | +from typing import Dict, List, Set, Optional, Iterable |
2 | 4 |
|
3 | 5 | from pydantic import BaseModel
|
4 | 6 |
|
5 | 7 |
|
| 8 | +# Todo: Adapt to use the new algorithm |
| 9 | + |
| 10 | + |
6 | 11 | class SemanticMatch(BaseModel):
|
7 | 12 | """
|
8 | 13 | A semantic match, mapping two semanticIDs with a matching score. Can be imagined as a weighted graph with
|
9 | 14 | `base_semantic_id` ---`score`---> `match_semantic_id`
|
10 | 15 |
|
11 |
| - Todo: Think about static and TTL, but that is optimization |
12 |
| - Todo: Maybe we want to have the matching method as debug information |
| 16 | + :cvar base_semantic_id: |
| 17 | + :cvar match_semantic_id: |
| 18 | + :cvar score: The semantic similarity score, a float between 0 and 1 |
| 19 | + :cvar path: Optionally, if the `SemanticMatch` did not come from a source but is inferred by another `SemanticMatch` |
| 20 | + the `path` stores the SemanticMatches it came from |
| 21 | + :cvar meta_information: Optional meta_information, such as the source of the `SemanticMatch` |
13 | 22 | """
|
14 | 23 | base_semantic_id: str
|
15 | 24 | match_semantic_id: str
|
16 | 25 | score: float
|
17 |
| - meta_information: Dict |
| 26 | + path: Optional[List["SemanticMatch"]] = None |
| 27 | + meta_information: Optional[Dict] = None |
| 28 | + |
| 29 | + def __hash__(self): |
| 30 | + return hash(( |
| 31 | + self.base_semantic_id, |
| 32 | + self.match_semantic_id, |
| 33 | + self.score, |
| 34 | + self.path, |
| 35 | + frozenset(self.meta_information.items()) |
| 36 | + )) |
| 37 | + |
| 38 | + @classmethod |
| 39 | + def combine_semantic_matches(cls, first: "SemanticMatch", second: "SemanticMatch") -> "SemanticMatch": |
| 40 | + """ |
| 41 | + Construct a new `SemanticMatch` by combining two `SemanticMatch`es. |
| 42 | +
|
| 43 | + Given the following situation: |
| 44 | + A --0.4--> B |
| 45 | + B --0.5--> C |
| 46 | + this constructs a new `SemanticMatch`: |
| 47 | + A --(0.4*0.5)--> C |
| 48 | + while updating the `path` information of the new `SemanticMatch` |
| 49 | +
|
| 50 | + :param first: First `SemanticMatch` |
| 51 | + :param second: Second `SemanticMatch`. Note that `second.base_semantic_id` needs to be the same |
| 52 | + as `first.match_semantic_id` |
| 53 | + :return: The combined `SemanticMatch` |
| 54 | + """ |
| 55 | + if not first.match_semantic_id == second.base_semantic_id: |
| 56 | + raise KeyError(f"Cannot combine. `first.match_semantic_id` ({first.match_semantic_id}) does not " |
| 57 | + f"fit `second.base_semantic_id` ({second.base_semantic_id}).") |
| 58 | + if second.path: |
| 59 | + new_path = copy.copy(second.path) |
| 60 | + new_path.insert(0, second) |
| 61 | + else: |
| 62 | + new_path = [second] |
| 63 | + return SemanticMatch( |
| 64 | + base_semantic_id=first.base_semantic_id, |
| 65 | + match_semantic_id=second.match_semantic_id, |
| 66 | + score=first.score*second.score, |
| 67 | + path=new_path, |
| 68 | + ) |
| 69 | + |
| 70 | +class SemanticMatchDictStore: |
| 71 | + """ |
| 72 | + A collection of `SemanticMatch`es, stored in a Dict, where the Key is the `base_semantic_id` and the Value is |
| 73 | + the `SemanticMatch` object. This allows for efficient resolution of the `SemanticMatches` of the `base_semantic_id`. |
| 74 | + """ |
| 75 | + def __init__(self, matches: Iterable[SemanticMatch]): |
| 76 | + self._store: Dict[str, Set[SemanticMatch]] = {} |
| 77 | + for x in matches: |
| 78 | + self.add(x) |
| 79 | + |
| 80 | + def add(self, match: SemanticMatch) -> None: |
| 81 | + """ |
| 82 | + Add a `SemanticMatch` to the store |
| 83 | + """ |
| 84 | + if match.base_semantic_id in self._store: |
| 85 | + self._store[match.base_semantic_id].add(match) |
| 86 | + else: |
| 87 | + self._store[match.base_semantic_id] = {match} |
| 88 | + |
| 89 | + def discard(self, match: SemanticMatch) -> None: |
| 90 | + """ |
| 91 | + Discard a `SemanticMatch` from the store |
| 92 | + """ |
| 93 | + # First we remove the `SemanticMatch` from the set of matches for that `base_semantic_id` |
| 94 | + self._store[match.base_semantic_id].discard(match) |
| 95 | + # Then, if there is no more `SemanticMatch`es for that `base_semantic_id`, we remove the Dict entry completely |
| 96 | + if not len(self._store[match.base_semantic_id]): |
| 97 | + self._store.pop(match.base_semantic_id) |
| 98 | + |
| 99 | + def get_all_matches(self) -> Set[SemanticMatch]: |
| 100 | + """ |
| 101 | + Return a set of all `SemanticMatch`es currently inside the store |
| 102 | + """ |
| 103 | + all_matches: Set[SemanticMatch] = set() |
| 104 | + for i in self._store.values(): |
| 105 | + all_matches.update(i) |
| 106 | + return all_matches |
| 107 | + |
| 108 | + def get_matches(self, semantic_id: str, min_score: Optional[float] = None) -> Set[SemanticMatch]: |
| 109 | + """ |
| 110 | + Return all 'SemanticMatches' of a given semantic_id currently inside a store that have a higher or equal |
| 111 | + score than the `min_score`. |
| 112 | + This is a recursive function, that also queries the matches of the matches, as long as the multiplicative |
| 113 | + scores of the matches is still higher or equal to the `min_score`. |
| 114 | + """ |
| 115 | + matches: Set[SemanticMatch] = set() # This is our return Set |
| 116 | + |
| 117 | + # First, we check on the current level |
| 118 | + current_matches_with_any_score = self._store.get(semantic_id, set()) |
| 119 | + current_matches = { |
| 120 | + match for match in current_matches_with_any_score if min_score is None or match.score >= min_score |
| 121 | + } |
| 122 | + # We can already update our return Set, since we know that the `current_matches` will definitely be inside |
| 123 | + matches.update(current_matches) |
| 124 | + |
| 125 | + # Now we do the same query each of the current_matches that have a score larger or equal to min_score |
| 126 | + # Todo: We currently have a loop in here that we need to break |
| 127 | + for match in current_matches: |
| 128 | + # We calculate the new minimal score |
| 129 | + # Unified score is multiplied: score(A->B) * score(B->C) |
| 130 | + # This score should be larger or equal than the requested min_score: |
| 131 | + # score(A->B) * score(B->C) >= min_score |
| 132 | + # score(A->B) is well known, as it is the `match.score` |
| 133 | + # => score(B->C) >= (min_score/score(A->B)) |
| 134 | + if min_score: |
| 135 | + new_min_score = min_score/match.score |
| 136 | + else: |
| 137 | + new_min_score = min_score |
| 138 | + # Here's the recursive function call, we do the same thing again with the new matches and the |
| 139 | + # updated `min_score`: |
| 140 | + new_matches = self.get_matches(semantic_id=match.base_semantic_id, min_score=new_min_score) |
| 141 | + # These new matches are now not relative to the original `base_semantic_id`, so we need to create new |
| 142 | + # `SemanticMatch`es and somehow store the path. |
| 143 | + for new_match in new_matches: |
| 144 | + matches.add(SemanticMatch.combine_semantic_matches( |
| 145 | + first=match, |
| 146 | + second=new_match |
| 147 | + )) |
| 148 | + |
| 149 | + # In the end, we return our return Set |
| 150 | + return matches |
| 151 | + |
| 152 | + def to_file(self, filename: str) -> None: |
| 153 | + matches: List[Dict] = [match.model_dump() for match in self.get_all_matches()] |
| 154 | + with open(filename, "w") as file: |
| 155 | + json.dump(matches, file, indent=4) |
| 156 | + |
| 157 | + @classmethod |
| 158 | + def from_file(cls, filename: str) -> "SemanticMatchDictStore": |
| 159 | + with open(filename, "r") as file: |
| 160 | + matches_data = json.load(file) |
| 161 | + matches = [SemanticMatch(**match_dict) for match_dict in matches_data] |
| 162 | + return cls(matches) |
| 163 | + |
| 164 | + def __len__(self) -> int: |
| 165 | + length = 0 |
| 166 | + for i in self._store.values(): |
| 167 | + length += len(i) |
| 168 | + return length |
18 | 169 |
|
19 | 170 |
|
20 | 171 | class EquivalenceTable(BaseModel):
|
|
0 commit comments