-
Notifications
You must be signed in to change notification settings - Fork 148
go/worker/storage/committee: Fix teardown #6444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Previously the fetch pool was closed first, which caused doneCh to never be closed, which cause wg.Wait to never finish. Probably a better approach is to fix workerpool.Pool.
✅ Deploy Preview for oasisprotocol-oasis-core canceled.
|
| // once the task is complete or the pool is stopped. | ||
| func (p *Pool) Submit(job func()) <-chan struct{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe correct way (over-engineering) would be:
func (p *Pool) Submit(jobCtx context.Context, job func(ctx context.Context)) <-chan struct{} {
First parameter would be the job context. Pool would also have internal context. The job should be canceled either if the job context was canceled or if the pool context was canceled. Pool context would be canceled by the pool.Stop (which should possibly block until all jobs have finished, or separate method for that).
Current pattern e.g.:
doneCh := fetchPool.Submit(func() {
w.fetchDiff(ctx, this.Round, prevRoots[i], this.Roots[i])
})
wg.Go(func() {
<-doneCh
})I find non-idiomatic and error prone.
Regardless I would stick to simpler solution either commit 1 or 2.:
In general go routines are cheap so worker pools shouldn't by typical in go? You have counting semaphore pattern and or buffered channels instead to limit concurrency. Worker pool internally also uses apache.channels which are deprecated, and also suffers from unbounded submit issue. Given that we only use it in two places, am not sure this package is actually needed long-term thus the preference for the simpler solutions.
This could remove the need for previous commit.
c5b803a to
1eeb529
Compare
| w.status = api.StatusStarting | ||
| w.statusLock.Unlock() | ||
|
|
||
| var fetchPool *workerpool.Pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes don't change anything, or am I missing something? And why is this change better? You just introduced that the pool can be nil, which in previous case was not possible.
| for item := range p.jobCh.Out() { //nolint:revive | ||
| job := item.(*jobDescriptor) | ||
| if job.completeCh != nil { | ||
| close(job.completeCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, you are using completeCh for 2 things: job completion and job cancellation, and users cannot distinguish them.
https://github.com/oasisprotocol/internal-ops/issues/1317#issuecomment-3765815368 showed that in case of corrupted storage, our worker teardown might get stuck.
How to test
Start your node with paratime configured and return a dummy error here.
Prior to this change indexer would continue, whilst storage worker would get stuck at teardown.
It would be nice to have a test for this, but we would need to completely refactor storage worker first. Mainly all the state DB, p2p and other stuff should be passed as parameters, so that errors can be mocked in the "integration" tests.