Problem
ReactHandler (and similarly ForwardHandler, DeleteMessageHandler) have no memory of which messages they already processed. Every pipeline run re-fetches the same messages within the since_hours window and re-applies the action:
- Re-reaction:
SendReactionRequest replaces the existing reaction — wastes a Telegram API call and triggers FloodWait
- Re-forward: may duplicate forwarded messages in the target dialog
- Re-delete: silently fails (message already gone), but still burns an API slot
For the react pipeline specifically: with since_hours=24 and a 5-minute interval, the same join-messages get re-reacted ~288 times per day.
Root Cause
FetchMessagesHandler returns all messages in the time window unconditionally. There is no per-pipeline, per-run, or per-node record of "already acted on message_id=X".
generation_runs.metadata stores only aggregate action_counts — no individual message IDs.
Proposed Solution
Track processed (pipeline_id, node_id, message_id, channel_id) tuples in a new table pipeline_action_log:
CREATE TABLE pipeline_action_log (
id INTEGER PRIMARY KEY,
pipeline_id INTEGER NOT NULL,
node_id TEXT NOT NULL,
channel_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
action TEXT NOT NULL, -- "react", "forward", "delete"
created_at TEXT NOT NULL,
UNIQUE(pipeline_id, node_id, channel_id, message_id, action)
);
Before acting on a message, the handler checks this table. If already present — skip. After acting successfully — insert.
Alternative (lighter): store processed message IDs in generation_runs.metadata as "reacted_ids": [...]. Cheaper to implement but harder to query across runs.
Acceptance Criteria
Problem
ReactHandler(and similarlyForwardHandler,DeleteMessageHandler) have no memory of which messages they already processed. Every pipeline run re-fetches the same messages within thesince_hourswindow and re-applies the action:SendReactionRequestreplaces the existing reaction — wastes a Telegram API call and triggers FloodWaitFor the react pipeline specifically: with
since_hours=24and a 5-minute interval, the same join-messages get re-reacted ~288 times per day.Root Cause
FetchMessagesHandlerreturns all messages in the time window unconditionally. There is no per-pipeline, per-run, or per-node record of "already acted on message_id=X".generation_runs.metadatastores only aggregateaction_counts— no individual message IDs.Proposed Solution
Track processed
(pipeline_id, node_id, message_id, channel_id)tuples in a new tablepipeline_action_log:Before acting on a message, the handler checks this table. If already present — skip. After acting successfully — insert.
Alternative (lighter): store processed message IDs in
generation_runs.metadataas"reacted_ids": [...]. Cheaper to implement but harder to query across runs.Acceptance Criteria
ReactHandlerskips messages already reacted to by this pipeline+nodeForwardHandlerskips already-forwarded messagesDeleteMessageHandlerskips already-deleted messagesresult_count=0(nothing new to process)