Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: define and implement core interfaces for async api #1591

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions util/async/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"sync"
)

// Pool is an interface for goroutine pool.
type Pool interface {
// Go submits a function to the goroutine pool.
Go(f func())
}

// Executor is an interface that can append functions to be executed asynchronously.
type Executor interface {
Pool
// Append adds functions to the executor. It should be safe to call Append concurrently.
Append(fs ...func())
}

// Callback defines a callback function that can be invoked immediately or later.
type Callback[T any] interface {
// Executor returns the executor that the callback uses.
Executor() Executor
// Inject adds a deferred action that will be invoked before the callback.
Inject(g func(T, error) (T, error))
// Invoke invokes the callback immediately in current goroutine.
Invoke(val T, err error)
// Schedule schedules the callback to be invoked later, it's typically called in other goroutines.
Schedule(val T, err error)
}

// NewCallback creates a new callback function.
func NewCallback[T any](e Executor, f func(T, error)) Callback[T] {
return &callback[T]{e: e, f: f}
}

type callback[T any] struct {
once sync.Once
e Executor
f func(T, error)
gs []func(T, error) (T, error)
}

// Executor implements Callback.
func (cb *callback[T]) Executor() Executor {
return cb.e
}

// Inject implements Callback.
func (cb *callback[T]) Inject(g func(T, error) (T, error)) {
cb.gs = append(cb.gs, g)
}

// Invoke implements Callback.
func (cb *callback[T]) Invoke(val T, err error) {
cb.once.Do(func() { cb.call(val, err) })
}

// Schedule implements Callback.
func (cb *callback[T]) Schedule(val T, err error) {
cb.once.Do(func() { cb.e.Append(func() { cb.call(val, err) }) })
}

func (cb *callback[T]) call(val T, err error) {
for i := len(cb.gs) - 1; i >= 0; i-- {
val, err = cb.gs[i](val, err)
}
cb.f(val, err)
}
90 changes: 90 additions & 0 deletions util/async/core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

type mockExecutor struct {
lock sync.Mutex
tasks []func()
}

func (e *mockExecutor) Go(f func()) {
e.Append(f)
}

func (e *mockExecutor) Append(fs ...func()) {
e.lock.Lock()
e.tasks = append(e.tasks, fs...)
e.lock.Unlock()
}

func TestInjectOrder(t *testing.T) {
cb := NewCallback(&mockExecutor{}, func(ns []int, err error) {
require.NoError(t, err)
// injected functions are executed in reverse order
require.Equal(t, []int{1, 2, 3}, ns)
})
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 3), nil })
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 2), nil })
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 1), nil })
cb.Invoke([]int{}, nil)
}

func TestFulfillOnce(t *testing.T) {
t.Run("InvokeTwice", func(t *testing.T) {
ns := []int{}
cb := NewCallback(&mockExecutor{}, func(n int, err error) { ns = append(ns, n) })
cb.Invoke(1, nil)
cb.Invoke(2, nil)
require.Equal(t, []int{1}, ns)
})
t.Run("ScheduleTwice", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Schedule(1, nil)
cb.Schedule(2, nil)
require.Equal(t, 1, len(e.tasks))
require.Equal(t, []int{}, ns)
e.tasks[0]()
require.Equal(t, []int{1}, ns)
})
t.Run("InvokeSchedule", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Invoke(1, nil)
cb.Schedule(2, nil)
require.Equal(t, 0, len(e.tasks))
require.Equal(t, []int{1}, ns)
})
t.Run("ScheduleInvoke", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Schedule(1, nil)
cb.Invoke(2, nil)
require.Equal(t, 1, len(e.tasks))
require.Equal(t, []int{}, ns)
e.tasks[0]()
require.Equal(t, []int{1}, ns)
})
}
158 changes: 158 additions & 0 deletions util/async/runloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"context"
"errors"
"sync"
)

// State represents the state of a run loop.
type State uint32

const (
StateIdle State = iota
StateWaiting
StateRunning
)

// RunLoop implements the Executor interface.
type RunLoop struct {
Pool

lock sync.Mutex
ready chan struct{}
runnable []func()
running []func()
state State
}

// NewRunLoop creates a new run-loop.
func NewRunLoop() *RunLoop {
return &RunLoop{ready: make(chan struct{})}
}

// Go submits f to the pool when possible (pool is not nil), otherwise starts a new goroutine for f.
func (l *RunLoop) Go(f func()) {
if l.Pool == nil {
go f()
} else {
l.Pool.Go(f)
}
}

// State returns the current state of the run-loop.
func (l *RunLoop) State() State {
l.lock.Lock()
state := l.state
l.lock.Unlock()
return state
}

// NumRunnable returns the number of runnable tasks in the run-loop currently.
func (l *RunLoop) NumRunnable() int {
l.lock.Lock()
n := len(l.runnable)
l.lock.Unlock()
return n
}

// Append implements the Executor interface. It's safe to call Append concurrently.
func (l *RunLoop) Append(fs ...func()) {
if len(fs) == 0 {
return
}

notify := false

l.lock.Lock()
l.runnable = append(l.runnable, fs...)
if l.state == StateWaiting {
l.state = StateIdle // waiting -> idle
notify = true
}
l.lock.Unlock()

if notify {
l.ready <- struct{}{}
}
}

// Exec drives the run-loop to execute all runnable tasks and returns the number of tasks executed. If the context is
// done before all tasks are executed, it returns the number of tasks executed and the context error. Exec turns the
// run-loop to running or waiting state during process, and finally to idle state on return. When calling Exec without
// pending runnables, the run-loop turns to waiting, in which case one should make sure that Append will be called in
// the other goroutine to wake it up later, or the context will be canceled finally to break the waiting. Exec should
// only be called by one goroutine.
func (l *RunLoop) Exec(ctx context.Context) (int, error) {
for {
l.lock.Lock()
if l.state != StateIdle {
l.lock.Unlock()
return 0, errors.New("runloop: already executing")
}
// assert l.state == stateIdle

if len(l.runnable) == 0 {
l.state = StateWaiting // idle -> waiting
l.lock.Unlock()
select {
case <-l.ready:
continue
case <-ctx.Done():
l.lock.Lock()
l.state = StateIdle // waiting -> idle
l.lock.Unlock()
return 0, ctx.Err()
}
} else {
l.running, l.runnable = l.runnable, l.running[:0]
l.state = StateRunning // idle -> running
l.lock.Unlock()
return l.run(ctx)
}
}
}

func (l *RunLoop) run(ctx context.Context) (int, error) {
count := 0
for {
for i, f := range l.running {
select {
case <-ctx.Done():
l.lock.Lock()
// move remaining running tasks to runnable
l.running = append(l.running[:0], l.running[i:]...)
l.running = append(l.running, l.runnable...)
l.running, l.runnable = l.runnable, l.running
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, ctx.Err()
default:
f()
count++
}
}
l.lock.Lock()
if len(l.runnable) == 0 {
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, nil
}
l.running, l.runnable = l.runnable, l.running[:0]
l.lock.Unlock()
}
}
Loading
Loading