Skip to content

Commit

Permalink
[omm] fetch e2e worked once (#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dcallies authored Dec 8, 2023
1 parent 87a4333 commit 22d6529
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
15 changes: 14 additions & 1 deletion open-media-match/src/OpenMediaMatch/background_tasks/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from threatexchange.exchanges.fetch_state import (
CollaborationConfigBase,
FetchDeltaTyped,
NoCheckpointing,
)

from OpenMediaMatch.background_tasks.development import get_apscheduler
Expand Down Expand Up @@ -66,7 +67,8 @@ def fetch(
api_cls = collab_store.exchange_get_type_configs().get(collab.api)
if api_cls is None:
log(
"No such SignalExchangeAPI '%s' - maybe it was deleted? You might have serious misconfiguration",
"No such SignalExchangeAPI '%s' - maybe it was deleted?"
" You might have serious misconfiguration",
level=logger.critical,
)
return
Expand All @@ -79,6 +81,17 @@ def fetch(
log("No checkpoint, should be the first fetch.")
else:
if starting_checkpoint.is_stale():
# This is a little jankey, but the stale behavior is actually fairly complex,
# and we want to avoid triggering on trivial fetching
if isinstance(starting_checkpoint, NoCheckpointing):
log(
"Is a NoCheckpointing class, which hopefully is a test type, "
"and we have a checkpoint. Considering complete"
)
collab_store.exchange_complete_fetch(
collab.name, is_up_to_date=True, exception=False
)
return
log("Checkpoint has become stale! Will refetch from scratch")
checkpoint = None
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def as_checkpoint(
fetch_status = self.fetch_status
if fetch_status is None:
return None
api_cls = exchange_types.get(self.name)
api_cls = exchange_types.get(self.api_cls)
if api_cls is None:
return None
return fetch_status.as_checkpoint(api_cls)
Expand Down Expand Up @@ -300,7 +300,8 @@ class ExchangeData(db.Model): # type: ignore[name-defined]
)

fetch_id: Mapped[str] = mapped_column(Text)
fetch_data: Mapped[t.Dict[str, t.Any]] = mapped_column(JSON)
pickled_original_fetch_data: Mapped[t.Optional[bytes]] = mapped_column(LargeBinary)
fetched_metadata_summary: Mapped[t.List[t.Any]] = mapped_column(JSON, default=list)

bank_content_id: Mapped[t.Optional[int]] = mapped_column(
ForeignKey(BankContent.id), unique=True, index=True
Expand All @@ -312,7 +313,7 @@ class ExchangeData(db.Model): # type: ignore[name-defined]
# Whether this has been matched by this instance of OMM
matched: Mapped[bool] = mapped_column(default=False)
# null = not verified; true = positive class; false = negative class
verification_result: Mapped[t.Optional[bool]] = mapped_column(default=False)
verification_result: Mapped[t.Optional[bool]] = mapped_column(default=None)

collab: Mapped["CollaborationConfig"] = relationship()

Expand Down
36 changes: 22 additions & 14 deletions open-media-match/src/OpenMediaMatch/storage/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
The default store for accessing persistent data on OMM.
"""
import pickle
import time
import typing as t

Expand All @@ -25,6 +26,7 @@
FetchCheckpointBase,
CollaborationConfigBase,
FetchedSignalMetadata,
AggregateSignalOpinion,
)

from OpenMediaMatch.storage.postgres import database, flask_utils
Expand Down Expand Up @@ -297,7 +299,7 @@ def exchange_commit_fetch(
sesh = database.db.session

for k, val in dat.items():
# More sequential fetches - trying to use in_ doesn't seem to work
# More sequential fetches - trying to use in_() doesn't seem to work
record = sesh.execute(
select(database.ExchangeData)
.where(database.ExchangeData.collab_id == cfg.id)
Expand All @@ -311,8 +313,8 @@ def exchange_commit_fetch(
record = database.ExchangeData()
record.collab_id = cfg.id
record.fetch_id = str(k)
sesh.add(record)
record.fetch_data = dataclass_json.dataclass_dump_dict(val)
sesh.add(record)
record.pickled_original_fetch_data = pickle.dumps(val)
sesh.flush()
self._sync_exchange_data_to_bank(
cfg.import_bank_id, collab_config, record, api_cls, k, val
Expand All @@ -321,9 +323,9 @@ def exchange_commit_fetch(
if fetch_status is None:
fetch_status = database.ExchangeFetchStatus()
fetch_status.collab = cfg
sesh.add(fetch_status)

fetch_status.set_checkpoint(checkpoint)

sesh.add(fetch_status)
sesh.commit()

def _sync_exchange_data_to_bank(
Expand All @@ -335,19 +337,25 @@ def _sync_exchange_data_to_bank(
fetch_key: t.Any,
fetch_value: t.Any,
) -> None:
if record.verification_result is False:
# We marked this as not valuable, so don't sync it to the bank
return
all_signal_types = self.get_signal_type_configs()
as_signal_types = api_cls.naive_convert_to_signal_type(
[stc.signal_type for stc in all_signal_types.values()],
cfg,
{fetch_key: fetch_value},
)
# Merge all the metadata together for one aggregated opinion for this record
# This might not be fully correct for all
all_signal_opinions = [
opinion
for signal_metadata in as_signal_types.values()
for metadata in signal_metadata.values()
for opinion in metadata.get_as_opinions()
]
arregated = AggregateSignalOpinion.from_opinions(all_signal_opinions)

sesh = database.db.session
bank_content = record.bank_content
if not as_signal_types:
if not as_signal_types or record.verification_result is False:
# there's no usable signals, so don't create an empty record
if bank_content is not None:
sesh.delete(bank_content)
Expand All @@ -359,7 +367,7 @@ def _sync_exchange_data_to_bank(
record.bank_content = bank_content
sesh.add(bank_content)

existing = {
unseen_signals_in_db_for_fetch_key = {
(signal.signal_type, signal.signal_val): signal
for signal in bank_content.signals
}
Expand All @@ -370,10 +378,10 @@ def _sync_exchange_data_to_bank(
# TODO - check the metadata for signals for opinions we own
# that have false-positive on them.
k = (signal_type.get_name(), signal_value)
if k in existing:
if k in unseen_signals_in_db_for_fetch_key:
# If we need to sync the record, here's where we do it
# Remove from existing list for removal check later
del existing[k]
# Remove from the list of signals
del unseen_signals_in_db_for_fetch_key[k]
else:
content_signal = database.ContentSignal(
content=bank_content,
Expand All @@ -385,7 +393,7 @@ def _sync_exchange_data_to_bank(
# Removals
# At this point, we've popped all the ones that are still in the record
# Any left are ones that have been removed from the API copy
for to_delete in existing.values():
for to_delete in unseen_signals_in_db_for_fetch_key.values():
database.db.session.delete(to_delete)
sesh.flush()

Expand Down

0 comments on commit 22d6529

Please sign in to comment.