Skip to content

Commit

Permalink
Add Scheduler
Browse files Browse the repository at this point in the history
- Renamed previously called scheduler to forwarder to resolve name
  conflicts
  • Loading branch information
hibiken committed Oct 13, 2020
1 parent fadcae7 commit 50e7f38
Show file tree
Hide file tree
Showing 19 changed files with 989 additions and 187 deletions.
18 changes: 11 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Client struct {
rdb *rdb.RDB
}

// NewClient and returns a new Client given a redis connection option.
// NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r))
return &Client{
Expand Down Expand Up @@ -208,6 +208,9 @@ type Result struct {
// ID is a unique identifier for the task.
ID string

// EnqueuedAt is the time the task was enqueued in UTC.
EnqueuedAt time.Time

// ProcessAt indicates when the task should be processed.
ProcessAt time.Time

Expand Down Expand Up @@ -298,12 +301,13 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
return nil, err
}
return &Result{
ID: msg.ID.String(),
ProcessAt: opt.processAt,
Queue: msg.Queue,
Retry: msg.Retry,
Timeout: timeout,
Deadline: deadline,
ID: msg.ID.String(),
EnqueuedAt: time.Now().UTC(),
ProcessAt: opt.processAt,
Queue: msg.Queue,
Retry: msg.Retry,
Timeout: timeout,
Deadline: deadline,
}, nil
}

Expand Down
36 changes: 19 additions & 17 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: now,
opts: []Option{},
wantRes: &Result{
ProcessAt: now,
Queue: "default",
Retry: defaultMaxRetry,
Timeout: defaultTimeout,
Deadline: noDeadline,
EnqueuedAt: now.UTC(),
ProcessAt: now,
Queue: "default",
Retry: defaultMaxRetry,
Timeout: defaultTimeout,
Deadline: noDeadline,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
Expand All @@ -70,11 +71,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: oneHourLater,
opts: []Option{},
wantRes: &Result{
ProcessAt: oneHourLater,
Queue: "default",
Retry: defaultMaxRetry,
Timeout: defaultTimeout,
Deadline: noDeadline,
EnqueuedAt: now.UTC(),
ProcessAt: oneHourLater,
Queue: "default",
Retry: defaultMaxRetry,
Timeout: defaultTimeout,
Deadline: noDeadline,
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
Expand Down Expand Up @@ -111,8 +113,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotRes, tc.wantRes, diff)
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotRes, tc.wantRes, diff)
}

for qname, want := range tc.wantPending {
Expand Down Expand Up @@ -366,7 +368,7 @@ func TestClientEnqueue(t *testing.T) {
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"),
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
Expand Down Expand Up @@ -471,12 +473,12 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"),
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotRes, tc.wantRes, diff)
t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.delay, gotRes, tc.wantRes, diff)
}

for qname, want := range tc.wantPending {
Expand Down Expand Up @@ -617,7 +619,7 @@ func TestClientDefaultOptions(t *testing.T) {
t.Fatal(err)
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"),
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
Expand Down
75 changes: 75 additions & 0 deletions forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
"sync"
"time"

"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)

// A forwarder is responsible for moving scheduled and retry tasks to pending state
// so that the tasks get processed by the workers.
type forwarder struct {
logger *log.Logger
broker base.Broker

// channel to communicate back to the long running "forwarder" goroutine.
done chan struct{}

// list of queue names to check and enqueue.
queues []string

// poll interval on average
avgInterval time.Duration
}

type forwarderParams struct {
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
}

func newForwarder(params forwarderParams) *forwarder {
return &forwarder{
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
queues: params.queues,
avgInterval: params.interval,
}
}

func (f *forwarder) terminate() {
f.logger.Debug("Forwarder shutting down...")
// Signal the forwarder goroutine to stop polling.
f.done <- struct{}{}
}

// start starts the "forwarder" goroutine.
func (f *forwarder) start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-f.done:
f.logger.Debug("Forwarder done")
return
case <-time.After(f.avgInterval):
f.exec()
}
}
}()
}

func (f *forwarder) exec() {
if err := f.broker.CheckAndEnqueue(f.queues...); err != nil {
f.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
}
}
137 changes: 137 additions & 0 deletions forwarder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)

func TestForwarder(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
s := newForwarder(forwarderParams{
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "critical"},
interval: pollInterval,
})
t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default")
t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical")
t3 := h.NewTaskMessageWithQueue("reindex", nil, "default")
t4 := h.NewTaskMessageWithQueue("sync", nil, "critical")
now := time.Now()

tests := []struct {
initScheduled map[string][]base.Z // scheduled queue initial state
initRetry map[string][]base.Z // retry queue initial state
initPending map[string][]*base.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state
wantScheduled map[string][]*base.TaskMessage // schedule queue final state
wantRetry map[string][]*base.TaskMessage // retry queue final state
wantPending map[string][]*base.TaskMessage // default queue final state
}{
{
initScheduled: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(time.Hour).Unix()}},
"critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}},
},
initRetry: map[string][]base.Z{
"default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}},
"critical": {},
},
initPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {t4},
},
wait: pollInterval * 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t3},
"critical": {t2, t4},
},
},
{
initScheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Unix()},
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
},
"critical": {
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
},
},
initRetry: map[string][]base.Z{
"default": {},
"critical": {},
},
initPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {t4},
},
wait: pollInterval * 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t3},
"critical": {t2, t4},
},
},
}

for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue
h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue
h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue

var wg sync.WaitGroup
s.start(&wg)
time.Sleep(tc.wait)
s.terminate()

for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}

for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}

for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-redis/redis/v7 v7.4.0
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.3.1
go.uber.org/goleak v0.10.0
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
Expand Down
18 changes: 18 additions & 0 deletions internal/asynqtest/asynqtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ var SortWorkerInfoOpt = cmp.Transformer("SortWorkerInfo", func(in []*base.Worker
return out
})

// SortSchedulerEntryOpt is a cmp.Option to sort base.SchedulerEntry for comparing slice of entries.
var SortSchedulerEntryOpt = cmp.Transformer("SortSchedulerEntry", func(in []*base.SchedulerEntry) []*base.SchedulerEntry {
out := append([]*base.SchedulerEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Spec < out[j].Spec
})
return out
})

// SortSchedulerEnqueueEventOpt is a cmp.Option to sort base.SchedulerEnqueueEvent for comparing slice of events.
var SortSchedulerEnqueueEventOpt = cmp.Transformer("SortSchedulerEnqueueEvent", func(in []*base.SchedulerEnqueueEvent) []*base.SchedulerEnqueueEvent {
out := append([]*base.SchedulerEnqueueEvent(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].EnqueuedAt.Unix() < out[j].EnqueuedAt.Unix()
})
return out
})

// SortStringSliceOpt is a cmp.Option to sort string slice.
var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string {
out := append([]string(nil), in...)
Expand Down
Loading

0 comments on commit 50e7f38

Please sign in to comment.