Skip to content

Commit

Permalink
Remove Recv from create task request (#512)
Browse files Browse the repository at this point in the history
* Remove Recv from create task request

* Solve spooky action at distance where the set of tags to use in
the generator was being unintenionally mutated and causing all
the requests to include a tags only supposed to be part of a single
request.

* Remove debugging print stmts

* Remove unused import
  • Loading branch information
avillega authored Jan 9, 2025
1 parent f8b8f36 commit 41d13d9
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 226 deletions.
129 changes: 69 additions & 60 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,31 @@ import (
func CreatePromise(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromise, "must be create promise")

return createPromiseAndTask(c, r, r.CreatePromise)
return createPromiseAndTask(c, r, r.CreatePromise, nil)
}

func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromiseAndTask, "must be create promise and task")
util.Assert(r.CreatePromiseAndTask.Promise.Id == r.CreatePromiseAndTask.Task.PromiseId, "promise ids must match")
util.Assert(r.CreatePromiseAndTask.Promise.Timeout == r.CreatePromiseAndTask.Task.Timeout, "timeouts must match")

return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: &t_aio.CreateTaskCommand{
Recv: r.CreatePromiseAndTask.Task.Recv,
Mesg: &message.Mesg{Type: message.Invoke, Root: r.CreatePromiseAndTask.Task.PromiseId, Leaf: r.CreatePromiseAndTask.Task.PromiseId},
Timeout: r.CreatePromiseAndTask.Task.Timeout,
ProcessId: &r.CreatePromiseAndTask.Task.ProcessId,
State: task.Claimed,
Ttl: r.CreatePromiseAndTask.Task.Ttl,
ExpiresAt: c.Time() + int64(r.CreatePromiseAndTask.Task.Ttl),
CreatedOn: c.Time(),
},
return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.CreateTaskCommand{
Recv: nil,
Mesg: &message.Mesg{Type: message.Invoke, Root: r.CreatePromiseAndTask.Task.PromiseId, Leaf: r.CreatePromiseAndTask.Task.PromiseId},
Timeout: r.CreatePromiseAndTask.Task.Timeout,
ProcessId: &r.CreatePromiseAndTask.Task.ProcessId,
State: task.Claimed,
Ttl: r.CreatePromiseAndTask.Task.Ttl,
ExpiresAt: c.Time() + int64(r.CreatePromiseAndTask.Task.Ttl),
CreatedOn: c.Time(),
})
}

func createPromiseAndTask(
c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
r *t_api.Request,
createPromiseReq *t_api.CreatePromiseRequest,
additionalCmds ...*t_aio.Command,
taskCmd *t_aio.CreateTaskCommand,
) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromise || r.Kind == t_api.CreatePromiseAndTask, "must be create promise or variant")

Expand Down Expand Up @@ -82,7 +79,7 @@ func createPromiseAndTask(
util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows")

if result.RowsReturned == 0 {
cmd := &t_aio.CreatePromiseCommand{
promiseCmd := &t_aio.CreatePromiseCommand{
Id: createPromiseReq.Id,
Param: createPromiseReq.Param,
Timeout: createPromiseReq.Timeout,
Expand All @@ -92,49 +89,48 @@ func createPromiseAndTask(
}

// if the promise does not exist, create it
completion, err := gocoro.SpawnAndAwait(c, createPromise(r.Tags, cmd, additionalCmds...))
completion, err := gocoro.SpawnAndAwait(c, createPromise(r.Tags, promiseCmd, taskCmd))
if err != nil {
return nil, err
}

if completion.Store.Results[0].CreatePromise.RowsAffected == 0 {
// It's possible that the promise was created by another coroutine
// while we were creating. In that case, we should just retry.
return createPromiseAndTask(c, r, createPromiseReq, additionalCmds...)
return createPromiseAndTask(c, r, createPromiseReq, taskCmd)
}

// set status
status = t_api.StatusCreated

// set promise
p = &promise.Promise{
Id: cmd.Id,
Id: promiseCmd.Id,
State: promise.Pending,
Param: cmd.Param,
Timeout: cmd.Timeout,
IdempotencyKeyForCreate: cmd.IdempotencyKey,
Tags: cmd.Tags,
CreatedOn: &cmd.CreatedOn,
Param: promiseCmd.Param,
Timeout: promiseCmd.Timeout,
IdempotencyKeyForCreate: promiseCmd.IdempotencyKey,
Tags: promiseCmd.Tags,
CreatedOn: &promiseCmd.CreatedOn,
}

switch r.Kind {
case t_api.CreatePromiseAndTask:
util.Assert(additionalCmds[0].Kind == t_aio.CreateTask, "command must be create task")
util.Assert(taskCmd != nil, "create task cmd must not be nil")
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateTask, "completion must be create task")
cmd := additionalCmds[0].CreateTask

t = &task.Task{
Id: completion.Store.Results[1].CreateTask.LastInsertId,
ProcessId: cmd.ProcessId,
State: cmd.State,
Recv: cmd.Recv,
Mesg: cmd.Mesg,
Timeout: cmd.Timeout,
ProcessId: taskCmd.ProcessId,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: cmd.Ttl,
ExpiresAt: cmd.ExpiresAt,
CreatedOn: &cmd.CreatedOn,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
}
}
} else {
Expand All @@ -161,7 +157,7 @@ func createPromiseAndTask(
if !ok {
// It's possible that the promise was created by another coroutine
// while we were timing out. In that case, we should just retry.
return createPromiseAndTask(c, r, createPromiseReq, additionalCmds...)
return createPromiseAndTask(c, r, createPromiseReq, taskCmd)
}

// set status to ok if not strict and idempotency keys match
Expand Down Expand Up @@ -196,64 +192,77 @@ func createPromiseAndTask(
return res, nil
}

func createPromise(tags map[string]string, cmd *t_aio.CreatePromiseCommand, additionalCmds ...*t_aio.Command) gocoro.CoroutineFunc[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion] {
if cmd.Param.Headers == nil {
cmd.Param.Headers = map[string]string{}
func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseCommand, taskCmd *t_aio.CreateTaskCommand, additionalCmds ...*t_aio.Command) gocoro.CoroutineFunc[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion] {
if promiseCmd.Param.Headers == nil {
promiseCmd.Param.Headers = map[string]string{}
}
if cmd.Param.Data == nil {
cmd.Param.Data = []byte{}
if promiseCmd.Param.Data == nil {
promiseCmd.Param.Data = []byte{}
}
if cmd.Tags == nil {
cmd.Tags = map[string]string{}
if promiseCmd.Tags == nil {
promiseCmd.Tags = map[string]string{}
}

return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
// add create promise command
commands := []*t_aio.Command{{
Kind: t_aio.CreatePromise,
CreatePromise: cmd,
CreatePromise: promiseCmd,
}}

// add additional commands
commands = append(commands, additionalCmds...)

// check router to see if a task needs to be created
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Router,
Tags: tags,
Router: &t_aio.RouterSubmission{
Promise: &promise.Promise{
Id: cmd.Id,
Id: promiseCmd.Id,
State: promise.Pending,
Param: cmd.Param,
Timeout: cmd.Timeout,
IdempotencyKeyForCreate: cmd.IdempotencyKey,
Tags: cmd.Tags,
CreatedOn: &cmd.CreatedOn,
Param: promiseCmd.Param,
Timeout: promiseCmd.Timeout,
IdempotencyKeyForCreate: promiseCmd.IdempotencyKey,
Tags: promiseCmd.Tags,
CreatedOn: &promiseCmd.CreatedOn,
},
},
})

if err != nil {
slog.Warn("failed to match promise", "cmd", cmd, "err", err)
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
}

if taskCmd != nil && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise when creating a task", "cmd", promiseCmd, "taskCmd", taskCmd)
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
}

if err == nil && completion.Router.Matched {
util.Assert(completion.Router.Recv != nil, "recv must not be nil")

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: &t_aio.CreateTaskCommand{
// If there is a taskCmd just update the Recv otherwise create a tasks for the match
if taskCmd != nil {
taskCmd.Recv = completion.Router.Recv
} else {
taskCmd = &t_aio.CreateTaskCommand{
Recv: completion.Router.Recv,
Mesg: &message.Mesg{Type: message.Invoke, Root: cmd.Id, Leaf: cmd.Id},
Timeout: cmd.Timeout,
Mesg: &message.Mesg{Type: message.Invoke, Root: promiseCmd.Id, Leaf: promiseCmd.Id},
Timeout: promiseCmd.Timeout,
State: task.Init,
CreatedOn: cmd.CreatedOn,
},
CreatedOn: promiseCmd.CreatedOn,
}

}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})
}

// add additional commands
commands = append(commands, additionalCmds...)

// yield commands
completion, err = gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/schedulePromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func SchedulePromises(config *system.Config, tags map[string]string) gocoro.Coro
CreatedOn: c.Time(),
}

awaiting[i] = gocoro.Spawn(c, createPromise(tags, commands[i], &t_aio.Command{
awaiting[i] = gocoro.Spawn(c, createPromise(tags, commands[i], nil, &t_aio.Command{
Kind: t_aio.UpdateSchedule,
UpdateSchedule: &t_aio.UpdateScheduleCommand{
Id: s.Id,
Expand Down
Loading

0 comments on commit 41d13d9

Please sign in to comment.