1
- from typing import Dict , List
1
+ import json
2
+ import copy
3
+ from typing import Dict , List , Set , Optional , Iterable
2
4
3
5
from pydantic import BaseModel
4
6
@@ -8,13 +10,159 @@ class SemanticMatch(BaseModel):
8
10
A semantic match, mapping two semanticIDs with a matching score. Can be imagined as a weighted graph with
9
11
`base_semantic_id` ---`score`---> `match_semantic_id`
10
12
11
- Todo: Think about static and TTL, but that is optimization
12
- Todo: Maybe we want to have the matching method as debug information
13
+ :cvar base_semantic_id:
14
+ :cvar match_semantic_id:
15
+ :cvar score: The semantic similarity score, a float between 0 and 1
16
+ :cvar path: Optionally, if the `SemanticMatch` did not come from a source but is inferred by another `SemanticMatch`
17
+ the `path` stores the SemanticMatches it came from
18
+ :cvar meta_information: Optional meta_information, such as the source of the `SemanticMatch`
13
19
"""
14
20
base_semantic_id : str
15
21
match_semantic_id : str
16
22
score : float
17
- meta_information : Dict
23
+ path : Optional [List ["SemanticMatch" ]] = None
24
+ meta_information : Optional [Dict ] = None
25
+
26
+ def __hash__ (self ):
27
+ return hash ((
28
+ self .base_semantic_id ,
29
+ self .match_semantic_id ,
30
+ self .score ,
31
+ self .path ,
32
+ frozenset (self .meta_information .items ())
33
+ ))
34
+
35
+ @classmethod
36
+ def combine_semantic_matches (cls , first : "SemanticMatch" , second : "SemanticMatch" ) -> "SemanticMatch" :
37
+ """
38
+ Construct a new `SemanticMatch` by combining two `SemanticMatch`es.
39
+
40
+ Given the following situation:
41
+ A --0.4--> B
42
+ B --0.5--> C
43
+ this constructs a new `SemanticMatch`:
44
+ A --(0.4*0.5)--> C
45
+ while updating the `path` information of the new `SemanticMatch`
46
+
47
+ :param first: First `SemanticMatch`
48
+ :param second: Second `SemanticMatch`. Note that `second.base_semantic_id` needs to be the same
49
+ as `first.match_semantic_id`
50
+ :return: The combined `SemanticMatch`
51
+ """
52
+ if not first .match_semantic_id == second .base_semantic_id :
53
+ raise KeyError (f"Cannot combine. `first.match_semantic_id` ({ first .match_semantic_id } ) does not "
54
+ f"fit `second.base_semantic_id` ({ second .base_semantic_id } )." )
55
+ if second .path :
56
+ new_path = copy .copy (second .path )
57
+ new_path .insert (0 , second )
58
+ else :
59
+ new_path = [second ]
60
+ return SemanticMatch (
61
+ base_semantic_id = first .base_semantic_id ,
62
+ match_semantic_id = second .match_semantic_id ,
63
+ score = first .score * second .score ,
64
+ path = new_path ,
65
+ )
66
+
67
+ class SemanticMatchDictStore :
68
+ """
69
+ A collection of `SemanticMatch`es, stored in a Dict, where the Key is the `base_semantic_id` and the Value is
70
+ the `SemanticMatch` object. This allows for efficient resolution of the `SemanticMatches` of the `base_semantic_id`.
71
+ """
72
+ def __init__ (self , matches : Iterable [SemanticMatch ]):
73
+ self ._store : Dict [str , Set [SemanticMatch ]] = {}
74
+ for x in matches :
75
+ self .add (x )
76
+
77
+ def add (self , match : SemanticMatch ) -> None :
78
+ """
79
+ Add a `SemanticMatch` to the store
80
+ """
81
+ if match .base_semantic_id in self ._store :
82
+ self ._store [match .base_semantic_id ].add (match )
83
+ else :
84
+ self ._store [match .base_semantic_id ] = {match }
85
+
86
+ def discard (self , match : SemanticMatch ) -> None :
87
+ """
88
+ Discard a `SemanticMatch` from the store
89
+ """
90
+ # First we remove the `SemanticMatch` from the set of matches for that `base_semantic_id`
91
+ self ._store [match .base_semantic_id ].discard (match )
92
+ # Then, if there is no more `SemanticMatch`es for that `base_semantic_id`, we remove the Dict entry completely
93
+ if not len (self ._store [match .base_semantic_id ]):
94
+ self ._store .pop (match .base_semantic_id )
95
+
96
+ def get_all_matches (self ) -> Set [SemanticMatch ]:
97
+ """
98
+ Return a set of all `SemanticMatch`es currently inside the store
99
+ """
100
+ all_matches : Set [SemanticMatch ] = set ()
101
+ for i in self ._store .values ():
102
+ all_matches .update (i )
103
+ return all_matches
104
+
105
+ def get_matches (self , semantic_id : str , min_score : Optional [float ] = None ) -> Set [SemanticMatch ]:
106
+ """
107
+ Return all 'SemanticMatches' of a given semantic_id currently inside a store that have a higher or equal
108
+ score than the `min_score`.
109
+ This is a recursive function, that also queries the matches of the matches, as long as the multiplicative
110
+ scores of the matches is still higher or equal to the `min_score`.
111
+ """
112
+ matches : Set [SemanticMatch ] = set () # This is our return Set
113
+
114
+ # First, we check on the current level
115
+ current_matches_with_any_score = self ._store .get (semantic_id , set ())
116
+ current_matches = {
117
+ match for match in current_matches_with_any_score if min_score is None or match .score >= min_score
118
+ }
119
+ # We can already update our return Set, since we know that the `current_matches` will definitely be inside
120
+ matches .update (current_matches )
121
+
122
+ # Now we do the same query each of the current_matches that have a score larger or equal to min_score
123
+ # Todo: We currently have a loop in here that we need to break
124
+ for match in current_matches :
125
+ # We calculate the new minimal score
126
+ # Unified score is multiplied: score(A->B) * score(B->C)
127
+ # This score should be larger or equal than the requested min_score:
128
+ # score(A->B) * score(B->C) >= min_score
129
+ # score(A->B) is well known, as it is the `match.score`
130
+ # => score(B->C) >= (min_score/score(A->B))
131
+ if min_score :
132
+ new_min_score = min_score / match .score
133
+ else :
134
+ new_min_score = min_score
135
+ # Here's the recursive function call, we do the same thing again with the new matches and the
136
+ # updated `min_score`:
137
+ new_matches = self .get_matches (semantic_id = match .base_semantic_id , min_score = new_min_score )
138
+ # These new matches are now not relative to the original `base_semantic_id`, so we need to create new
139
+ # `SemanticMatch`es and somehow store the path.
140
+ for new_match in new_matches :
141
+ matches .add (SemanticMatch .combine_semantic_matches (
142
+ first = match ,
143
+ second = new_match
144
+ ))
145
+
146
+ # In the end, we return our return Set
147
+ return matches
148
+
149
+ def to_file (self , filename : str ) -> None :
150
+ matches : List [Dict ] = [match .model_dump () for match in self .get_all_matches ()]
151
+ with open (filename , "w" ) as file :
152
+ json .dump (matches , file , indent = 4 )
153
+
154
+ @classmethod
155
+ def from_file (cls , filename : str ) -> "SemanticMatchDictStore" :
156
+ with open (filename , "r" ) as file :
157
+ matches_data = json .load (file )
158
+ matches = [SemanticMatch (** match_dict ) for match_dict in matches_data ]
159
+ return cls (matches )
160
+
161
+ def __len__ (self ) -> int :
162
+ length = 0
163
+ for i in self ._store .values ():
164
+ length += len (i )
165
+ return length
18
166
19
167
20
168
class EquivalenceTable (BaseModel ):
0 commit comments