-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance: add broadcast for streaming service #39020
enhance: add broadcast for streaming service #39020
Conversation
@chyezh go-sdk check failed, comment |
adc133d
to
1292957
Compare
@chyezh E2e jenkins job failed, comment |
129b3bd
to
9e1bfb5
Compare
@chyezh go-sdk check failed, comment |
@chyezh cpp-unit-test check failed, comment |
rerun cpp-unit-test |
rerun go-sdk |
- Add new rpc for transfer broadcast to streaming coord - Add broadcast service at streaming coord to make broadcast message sent Signed-off-by: chyezh <[email protected]>
9e1bfb5
to
4fe6647
Compare
|
||
func (c *catalog) SaveBroadcastTask(ctx context.Context, task *streamingpb.BroadcastTask) error { | ||
key := buildBroadcastTaskPath(task.TaskId) | ||
if task.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the RemoveBroadcastTask method to delete metadata from the catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to keep only state transfer semantic at catalog layer.
|
||
// Start n workers to handle the broadcast task. | ||
wg := sync.WaitGroup{} | ||
for i := 0; i < 4; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the CPU count instead of hardcoding the number 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
case task := <-b.backoffChan: | ||
// task is backoff, push it into backoff queue to make a delay retry. | ||
b.backoffs.Push(task) | ||
case <-nextBackOff: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reinsert the backoff task at the front of the pending queue; otherwise, consuming messages waiting for Barrie will take a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
|
||
// Poll polls the task, return nil if the task is done, otherwise not done. | ||
// Poll can be repeated called until the task is done. | ||
func (b *broadcastTask) Poll(ctx context.Context, operator AppendOperator) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method is somewhat confusingly named "poll," as it actually executes the task. so suggest separating the task's state check and its execution into two distinct methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
if len(newPendings) == 0 { | ||
b.future.Set(&types.BroadcastAppendResult{AppendResults: b.appendResult}) | ||
} | ||
b.logger.Info("broadcast task make a new broadcast done", zap.Int("pendingMessages", len(b.pendingMessages))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use "backoff messages " instead of "pendingMessages"
Address the issue in the next PR. |
[APPROVALNOTIFIER] This PR is APPROVED Approval requirements bypassed by manually added approval. This pull-request has been approved by: chyezh The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
issue: #38399 pr: #39020 - Add new rpc for transfer broadcast to streaming coord - Add broadcast service at streaming coord to make broadcast message sent automicly also cherry pick the pr #38400 --------- Signed-off-by: chyezh <[email protected]>
issue: #38399