Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinot real-time data ingestion freshness guarantee #14815

Open
lnbest0707-uber opened this issue Jan 15, 2025 · 1 comment
Open

Pinot real-time data ingestion freshness guarantee #14815

lnbest0707-uber opened this issue Jan 15, 2025 · 1 comment
Labels
ingestion PEP-Request Pinot Enhancement Proposal request to be reviewed. real-time

Comments

@lnbest0707-uber
Copy link
Contributor

[Issue and Proposal]

One of the biggest challenge of Pinot real-time data ingestion is to always keep ingestion latency low, especially when experiencing traffic volume increase (spikes) from the stream, e.g. Kafka.

Pinot underlying is doing 1-1 subscription to stream partitions. Therefore, one single consuming segment thread is responsible for pulling data from one stream partition. The ingestion speed is bottlenecked by the single CPU core computation speed. With heavy decoding, transformation and indexing usages, the per partition ingestion speed would be capped. Hence, once the stream per partition data volume spikes, Pinot might experience high ingestion latency.

To catch up with the latest messages, Pinot usually has to seek for partition increase on the stream server, e.g. Kafka partition number bumps. This operation usually requires human involvement and cannot be committed immediately. The table would, as a result, experience high ingestion latency for a long time and in most of cases, cannot recover quickly even after the partition number increases.

Meanwhile, in many realtime analytics use cases, e.g. observability related non-upserting cases

  • Users do not require strong message order guarantee.
  • Most recent (e.g. last 15 minutes) data's value is far higher than old data.

As a result, in the real world operations, users might require Pinot admins to manually reset ingestion offset to the latest by:

  1. Call Table/pauseConsumption API
  2. After all segments sealed, call Table/resumeConsumption API to resume from consumeFrom = largest
    It would consequently force consuming the latest messages immediately but would lose the in-between messages we'd skipped.

With above issues, a near-freshness-guarantee feature would be required, so that it could:

  • Reset the offset automatically based on the offset or timestamp lag
  • Consume and serve the latest messages instantly but do not lose the in-between data permanently
  • Ingest the in-between data in a offline or near-real-time behaviors

To fulfill this requirement, a proposed solution would be based on:

  1. When segment seals, controller checks the last ingested offset, compare with Kafka’s largest offset and determine if needs to skip the offset based on (by configurations)
    • Offset` difference
    • Timestamp difference (based on Kafka meta data)
  2. If skip from offset1 to offset2, spawn the backfill job

The backfill could be based on Pinot Minion job:

  • Controller update the ideal state, and force next segment to consume from offset2
  • Dump the offset skip info, (Pinot partition number, offset1, offset2) to helix/zk
  • Trigger the Minion job to consume the according offset and build segments offline

Alternatively, the backfill could also be done in a near-real-time way with multi-topics ingestion (#13790):

  • Controller add a temporary topic to the table
  • The temporary topic would be responsible to consume the missing offsets

In particular, if the stream system already has replication support, like Uber uReplicator, to replicate the in-between segments to another topic. Then it could be simplified as:

  1. Controller notify the stream system to replicate (topic name, partition number, from offset, to offset) to "new topic"
  2. Controller temporarily add "new topic" to table config and ideal state
  3. Finish the consumption of the "new topic"
  4. Remove the temporary topic

Besides, with both proposals, some challenges need to be considered/resolved:

  • How to deal with multiple offset resets on a partition
  • How to split the offset range to multiple minion jobs (what kind of policy to follow?) Or how many partitions should we provision for the "new topic"?
  • How to save and update the offset skip and backfill status?
@wirybeaver
Copy link
Contributor

wirybeaver commented Jan 16, 2025

backfill through minion jobs makes more sense, creating a temporary topic would cause more overhead to address the failure scenario, for example: the number of topics would blows up if multiple ingestion spike happens. How to guarantee the consistency between temporary topics and number offset gaps.

We could add the consuming segments metadata in helix for minion tasks. When the backfilling is done, delete the temporarily task info. There should be a GC minion task to delete minion task info to reconciliation.
MinionTask: ConsumingSegments1: startOffset=offset1, endOffset=offset2, status=InProgress

and then add the consuming segments metadata as normal for pinot servers.
ConsumingSegments2: startOffset=offset2, status=InProgress

Actually, what I am thinking is sort of Async Pinot-Kafka consumer. We could have find another pinot-server backfilling threads pool to consume and seal [offset1, offset2), not necessarily a minion task, so that we can reuse the existing pinot-server code.

@Jackie-Jiang Jackie-Jiang added ingestion real-time PEP-Request Pinot Enhancement Proposal request to be reviewed. labels Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PEP-Request Pinot Enhancement Proposal request to be reviewed. real-time
Projects
None yet
Development

No branches or pull requests

3 participants