Skip to content

Commit

Permalink
Change TaskInfo to use public fields instead of methods
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jun 29, 2021
1 parent e01c637 commit a9feec5
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 300 deletions.
116 changes: 66 additions & 50 deletions asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,70 +38,86 @@ func NewTask(typename string, payload []byte) *Task {

// A TaskInfo describes a task and its metadata.
type TaskInfo struct {
msg *base.TaskMessage
state base.TaskState
nextProcessAt time.Time
}
// ID is the identifier of the task.
ID string

// ID returns the id of the task.
func (info *TaskInfo) ID() string { return info.msg.ID.String() }
// Queue is the name of the queue in which the task belongs.
Queue string

// Queue returns the name of the queue in which the task belongs.
func (info *TaskInfo) Queue() string { return info.msg.Queue }
// Type is the type name of the task.
Type string

// Type returns the type name of the task.
func (info *TaskInfo) Type() string { return info.msg.Type }
// Payload is the payload data of the task.
Payload []byte

// Payload returns the payload data of the task.
func (info *TaskInfo) Payload() []byte { return info.msg.Payload }
// State indicates the task state.
State TaskState

func (info *TaskInfo) State() TaskState {
switch info.state {
case base.TaskStateActive:
return TaskStateActive
case base.TaskStatePending:
return TaskStatePending
case base.TaskStateScheduled:
return TaskStateScheduled
case base.TaskStateRetry:
return TaskStateRetry
case base.TaskStateArchived:
return TaskStateArchived
}
panic("internal error: unknown state in TaskInfo")
}
// MaxRetry is the maximum number of times the task can be retried.
MaxRetry int

// Retried is the number of times the task has retried so far.
Retried int

// MaxRetry returns the maximum number of times the task can be retried.
func (info *TaskInfo) MaxRetry() int { return info.msg.Retry }
// LastErr is the error message from the last failure.
LastErr string

// Retried returns the number of times the task has retried so far.
func (info *TaskInfo) Retried() int { return info.msg.Retried }
// LastFailedAt is the time time of the last failure if any.
// If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}).
LastFailedAt time.Time

// LastErr returns the error message from the last failure.
// If the task has no failures, returns an empty string.
func (info *TaskInfo) LastErr() string { return info.msg.ErrorMsg }
// Timeout is the duration the task can be processed by Handler before being retried,
// zero if not specified
Timeout time.Duration

// LastFailedAt returns the time of the last failure if any.
// If the task has no failures, returns zero time.
func (info *TaskInfo) LastFailedAt() time.Time { return time.Unix(info.msg.LastFailedAt, 0) }
// Deadline is the deadline for the task, zero value if not specified.
Deadline time.Time

// Timeout returns the duration the task can be processed by Handler before being retried,
// zero if not specified
func (info *TaskInfo) Timeout() time.Duration {
return time.Duration(info.msg.Timeout) * time.Second
// NextProcessAt is the time the task is scheduled to be processed,
// zero if not applicable.
NextProcessAt time.Time
}

// Deadline returns the deadline for the task, zero value if not specified.
func (info *TaskInfo) Deadline() time.Time {
if info.msg.Deadline == 0 {
return time.Time{}
func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo {
info := TaskInfo{
ID: msg.ID.String(),
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
Timeout: time.Duration(msg.Timeout) * time.Second,
NextProcessAt: nextProcessAt,
}
if msg.LastFailedAt == 0 {
info.LastFailedAt = time.Time{}
} else {
info.LastFailedAt = time.Unix(msg.LastFailedAt, 0)
}
return time.Unix(info.msg.Deadline, 0)
}

// NextProcessAt returns the time the task is scheduled to be processed,
// zero if not applicable.
func (info *TaskInfo) NextProcessAt() time.Time { return info.nextProcessAt }
if msg.Deadline == 0 {
info.Deadline = time.Time{}
} else {
info.Deadline = time.Unix(msg.Deadline, 0)
}

switch state {
case base.TaskStateActive:
info.State = TaskStateActive
case base.TaskStatePending:
info.State = TaskStatePending
case base.TaskStateScheduled:
info.State = TaskStateScheduled
case base.TaskStateRetry:
info.State = TaskStateRetry
case base.TaskStateArchived:
info.State = TaskStateArchived
default:
panic(fmt.Sprintf("internal error: unknown state: %d", state))
}
return &info
}

// TaskState denotes the state of a task.
type TaskState int
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
case err != nil:
return nil, err
}
return &TaskInfo{msg, state, opt.processAt}, nil
return newTaskInfo(msg, state, opt.processAt), nil
}

func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
Expand Down
Loading

0 comments on commit a9feec5

Please sign in to comment.