Skip to content

Commit

Permalink
Allow configuration of DelayedTaskCheckInterval
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee authored Jan 3, 2022
1 parent 2d01705 commit ce46b07
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
vendor
# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- The `asynq stats` command now supports a `--json` option, making its output a JSON object
- Introduced new configuration for `DelayedTaskCheckInterval`. See [godoc](https://godoc.org/github.com/hibiken/asynq) for more details.

## [0.20.0] - 2021-12-19

Expand Down
14 changes: 13 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type Config struct {
//
// If unset or zero, the interval is set to 15 seconds.
HealthCheckInterval time.Duration

// DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry'
// tasks, and forwarding them to 'pending' state if they are ready to be processed.
//
// If unset or zero, the interval is set to 5 seconds.
DelayedTaskCheckInterval time.Duration
}

// An ErrorHandler handles an error occured during task processing.
Expand Down Expand Up @@ -287,6 +293,8 @@ const (
defaultShutdownTimeout = 8 * time.Second

defaultHealthCheckInterval = 15 * time.Second

defaultDelayedTaskCheckInterval = 5 * time.Second
)

// NewServer returns a new Server given a redis connection option
Expand Down Expand Up @@ -362,11 +370,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
starting: starting,
finished: finished,
})
delayedTaskCheckInterval := cfg.DelayedTaskCheckInterval
if delayedTaskCheckInterval == 0 {
delayedTaskCheckInterval = defaultDelayedTaskCheckInterval
}
forwarder := newForwarder(forwarderParams{
logger: logger,
broker: rdb,
queues: qnames,
interval: 5 * time.Second,
interval: delayedTaskCheckInterval,
})
subscriber := newSubscriber(subscriberParams{
logger: logger,
Expand Down

0 comments on commit ce46b07

Please sign in to comment.