Skip to content

chore: do not assume all batches in stream fetch response are complete#4627

Merged
morenol merged 1 commit intofluvio-community:masterfrom
morenol:fix-issue-with-batch-not-complete
Feb 1, 2026
Merged

chore: do not assume all batches in stream fetch response are complete#4627
morenol merged 1 commit intofluvio-community:masterfrom
morenol:fix-issue-with-batch-not-complete

Conversation

@morenol
Copy link
Collaborator

@morenol morenol commented Dec 31, 2025

Fix issue reported on discord with this code:

async fn main() -> Result<()> {
    let fluvio = Fluvio::connect_with_config(&FluvioConfig::new("127.0.0.1:9003")).await?;

    let producer = fluvio.topic_producer("raw-logs").await?;

    let mut count = 0;
    let target = 100_000;

    for _i in 1..=target {
        count += 1;
        producer
            .send(
                RecordKey::NULL,
                format!("This is the sample log that i am sending to the topic to check if the count is fine in this or not, and current count is {}", count),
            )
            .await?;
        
    }
    let consumer_config = ConsumerConfigExtBuilder::default()
        .topic("raw-logs")
        .offset_consumer("raw-consumer".to_string())
        .offset_start(Offset::beginning())
        .offset_strategy(fluvio::consumer::OffsetManagementStrategy::Auto)
        .build()?;

    let mut stream = fluvio.consumer_with_config(consumer_config).await?;
    let mut prev: i64 = -1;
    let mut total = 0;
    while let Some(Ok(record)) = stream.next().await {
        let _raw_line = String::from_utf8_lossy(record.value()).to_string();

        if record.offset - prev != 1 {
            println!("Offset broke here at count {} record {}", prev, record.offset);
        }
        total += 1;
        prev = record.offset;
    }
    Ok(())
}

That is happening because we use next_offset_for_fetch which computes the offset based on metadata of last batch in response which is based on batch metadata. But if that batches does not decode correctly due to being truncated by SPU, the not decoded records from that batch are skipped.

Has some conflicts with #4522 not sure why there we had to ignore decoding errors from batches

@morenol morenol requested a review from fraidev December 31, 2025 02:27
Copy link
Contributor

@fraidev fraidev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! What happened with our tests? Would be good to run them to have sure there is no regression.

@morenol
Copy link
Collaborator Author

morenol commented Dec 31, 2025

Do you remember why we had to add this in #4522?:

        let mut decode = |data: &[u8]| -> Result<(), CompressionError> {
            match records.decode(&mut &*data, 0) {
                Ok(_) => Ok(()),
                Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
                    debug!("not enough bytes for decoding memory records from raw");
                    Ok(())
                }
                Err(err) => Err(err.into()),
            }
        };

I think that the changes in crates/fluvio/src/consumer/mod.rs are not needed if we don't ignore the error there

@digikata
Copy link
Contributor

digikata commented Jan 2, 2026

The tests along with a lot of the workflows need to be simplified and decoupled from the old infra. There probably only a few little changes it looks like.

@morenol
Copy link
Collaborator Author

morenol commented Jan 2, 2026

The tests along with a lot of the workflows need to be simplified and decoupled from the old infra

Sure, should we create a new docker hub org or publish the image on github packages?

@digikata
Copy link
Contributor

digikata commented Jan 2, 2026

I lean towards trying a github package first

@digikata digikata force-pushed the fix-issue-with-batch-not-complete branch from 4391ad9 to 9f1fd48 Compare January 8, 2026 10:11
@morenol morenol added the no-stale Opt-out of closing issue due to no activity label Jan 15, 2026
@morenol morenol force-pushed the fix-issue-with-batch-not-complete branch 3 times, most recently from 4735576 to 1e57199 Compare January 31, 2026 18:19
@morenol morenol force-pushed the fix-issue-with-batch-not-complete branch 3 times, most recently from 58bed8a to d8e80a0 Compare February 1, 2026 14:59
@morenol morenol force-pushed the fix-issue-with-batch-not-complete branch from d8e80a0 to dc70a1b Compare February 1, 2026 15:20
@morenol morenol added this pull request to the merge queue Feb 1, 2026
Merged via the queue into fluvio-community:master with commit a90addc Feb 1, 2026
99 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no-stale Opt-out of closing issue due to no activity

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants