-
Notifications
You must be signed in to change notification settings - Fork 11.7k
[indexer-alt-framework] Set checkpoint_hi_inclusive, reader_lo, pruner_hi if watermark does not exist #24523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
26aee1c to
6f10779
Compare
6f10779 to
e69ea47
Compare
crates/sui-pg-db/src/store.rs
Outdated
| first_checkpoint: u64, | ||
| ) -> anyhow::Result<u64> { | ||
| // Create a StoredWatermark directly from CommitterWatermark | ||
| let stored_watermark = StoredWatermark { | ||
| pipeline: pipeline_task.to_string(), | ||
| epoch_hi_inclusive: 0, | ||
| checkpoint_hi_inclusive: first_checkpoint as i64 - 1, | ||
| tx_hi: 0, | ||
| timestamp_ms_hi_inclusive: 0, | ||
| reader_lo: first_checkpoint as i64, | ||
| pruner_timestamp: NaiveDateTime::UNIX_EPOCH, | ||
| pruner_hi: first_checkpoint as i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think as a trait method we should have init_watermark accept three checkpoint values for checkpoint_hi_inclusive, reader_lo, pruner_hi, or have the doc comments explain that init_watermark is supposed to set these values a particular way
Also, I think we want pruner_hi <= reader_lo < checkpoint_hi_inclusive, which should be safe if we handle when checkpoint_hi_inclusive is 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't checkpoint_hi_inclusive = 0 mean that checkpoint 0 was indexed which is not true initially?
e69ea47 to
a44ecd9
Compare
a44ecd9 to
5e8baab
Compare
5e8baab to
b42f096
Compare
b42f096 to
18274e0
Compare
| async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> { | ||
| async fn add_pipeline<P: Processor + 'static>( | ||
| &mut self, | ||
| init_watermark: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose alternatively, we don't need to add this function parameter, and just check self.task.is_none() in add_pipeline right? If it's a tasked indexer, we don't need to seed un-watermarked pipelines, because they're tasked pipelines and don't have the ability to prune
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially added this parameter to differentiate between concurrent pipelines (need watermark) and sequential pipelines (do not need watermark) pipelines, but needed modify it to also exclude concurrent tasked pipelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see. I think it doesn't really matter what we seed a sequential pipeline's reader_lo and pruner_hi to, which is why I was suggesting that we could consolidate the logic into add_pipeline, and just check for self.task.is_none() && self.default_next_checkpoint > 0 there. That would mean for sequential pipelines, indexer starting at a non-genesis checkpoint would seed it with the same watermark a concurrent pipeline would get if no watermark entry exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, then again, from another perspective, this is only needed if a pipeline is capable of pruning. And that is a concurrent pipeline running in a main indexer..
18274e0 to
7fc131b
Compare
7fc131b to
eb9dd76
Compare
wlmyng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to unblock
Wanted to circle back/ raise the discussion on add_pipeline but I don't feel too strongly about it
And had some comments on the tests that would be nice to address
| async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> { | ||
| async fn add_pipeline<P: Processor + 'static>( | ||
| &mut self, | ||
| init_watermark: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see. I think it doesn't really matter what we seed a sequential pipeline's reader_lo and pruner_hi to, which is why I was suggesting that we could consolidate the logic into add_pipeline, and just check for self.task.is_none() && self.default_next_checkpoint > 0 there. That would mean for sequential pipelines, indexer starting at a non-genesis checkpoint would seed it with the same watermark a concurrent pipeline would get if no watermark entry exists.
| .with_context(|| format!("Failed to get watermark for {pipeline_task}"))?; | ||
| let watermark = if init_watermark { | ||
| let init_watermark = InitWatermark { | ||
| checkpoint_hi_inclusive: self.default_next_checkpoint as i64 - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe can add a // SAFETY: checked > 0 earlier
| async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> { | ||
| async fn add_pipeline<P: Processor + 'static>( | ||
| &mut self, | ||
| init_watermark: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, then again, from another perspective, this is only needed if a pipeline is capable of pruning. And that is a concurrent pipeline running in a main indexer..
| ..Default::default() | ||
| }) | ||
| .await; | ||
| assert_eq!(committer_watermark, None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a comment like, "indexer will not seed the watermark, pipeline tasks will write commit watermarks as normal" would be nice
| #[tokio::test] | ||
| async fn test_init_watermark_concurrent_pipeline_first_checkpoint_1() { | ||
| let (committer_watermark, pruner_watermark) = | ||
| test_init_watermark(InitCheckpointArgs::default()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default to me reads = 0
I think for tests we should explicitly be like, "InitCheckpointArgs::init(heckpoint 1)"
Description
Without the change in this PR, the indexer logs these errors when started using a non-zero
--first-checkpointfor the first time (no watermark record exists):This error is caused by the pruner attempting to prune data that was never indexed because
pruner_hiis initialized to0.This PR sets
checkpoint_hi_inclusive,reader_lo,pruner_hibased on thedefault_checkpointwhich is either the value from--first-checkpoint(defaulting to0if not set). When the pruner runs, it uses this value ofpruner_hito avoid trying to prune data that was never indexed.Test plan
a. Deploy sui-indexer-alt-benchmark with
pulumi up(this line causes the benchmark to use the branch from this PR https://github.com/MystenLabs/sui-operations/blob/c3011e3ae777c8b58019c4a83211aefa54f8d372/pulumi/gcp/sui-indexer-alt-benchmark/Pulumi.dev.yaml#L8)b. Verify error is not in logs
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.
--first-checkpoint.