Skip to content

Commit

Permalink
DST horizontal partitioning (#556)
Browse files Browse the repository at this point in the history
This PR contains several related changes to be able to make the DST run faster and enable validations between promises and tasks.

Make DST do horizontal partitions based on the rootPromiseId
Add many validations between tasks and promises as well as adjusting some others
Make a complete promise operation also complete all related tasks including the current claimed task
General small changes to the way we print and report DST results to make it easier to debug.
Make trying to complete a task that is already completed return a StatusOk response
Create a single CreatePromiseAndTask command that is aware of not creating a task in case a promise was not being able to be created.
  • Loading branch information
avillega authored Mar 11, 2025
1 parent 7195f7f commit f188e8b
Show file tree
Hide file tree
Showing 15 changed files with 865 additions and 370 deletions.
4 changes: 4 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func RunDSTCmd() *cobra.Command {
scenario string
visualizationPath string
verbose bool
printOps bool

reqsPerTick = util.NewRangeIntFlag(1, 25)
ids = util.NewRangeIntFlag(1, 25)
Expand Down Expand Up @@ -170,6 +171,7 @@ func RunDSTCmd() *cobra.Command {
Timeout: timeout,
VisualizationPath: visualizationPath,
Verbose: verbose,
PrintOps: printOps,
TimeElapsedPerTick: 1000, // ms
TimeoutTicks: t,
ReqsPerTick: func() int { return reqsPerTick.Resolve(r) },
Expand Down Expand Up @@ -210,6 +212,7 @@ func RunDSTCmd() *cobra.Command {
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: default, fault, lazy")
cmd.Flags().StringVar(&visualizationPath, "visualization-path", "dst.html", "porcupine visualization file path")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "log additional information when run is non linearizable")
cmd.Flags().BoolVar(&printOps, "print-ops", true, "log the request/response pairs of a run.")
cmd.Flags().Var(reqsPerTick, "reqs-per-tick", "number of requests per tick")
cmd.Flags().Var(ids, "ids", "promise id set size")
cmd.Flags().Var(idempotencyKeys, "idempotency-keys", "idempotency key set size")
Expand All @@ -220,6 +223,7 @@ func RunDSTCmd() *cobra.Command {

// bind config
_ = config.BindDST(cmd)
cmd.SilenceUsage = true

cmd.Flags().SortFlags = false

Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/completeTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func CompleteTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
}

if t.State == task.Completed || t.State == task.Timedout {
status = t_api.StatusTaskAlreadyCompleted
status = t_api.StatusOK
} else if t.State == task.Init || t.State == task.Enqueued {
status = t_api.StatusTaskInvalidState
} else if t.Counter != r.CompleteTask.Counter {
Expand Down
10 changes: 7 additions & 3 deletions internal/app/coroutines/createCallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateCallback.PromiseId, r.CreateCallback.Id)
cbId := callbackId(r.CreateCallback.RootPromiseId, r.CreateCallback.PromiseId)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand All @@ -76,7 +76,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -104,7 +104,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
if result.RowsAffected == 1 {
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func callbackId(rootPromiseId, promiseId string) string {
return fmt.Sprintf("__resume:%s:%s", rootPromiseId, promiseId)
}
93 changes: 66 additions & 27 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
Expand All @@ -24,6 +25,7 @@ func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completio
util.Assert(r.CreatePromiseAndTask.Promise.Timeout == r.CreatePromiseAndTask.Task.Timeout, "timeouts must match")

return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.CreateTaskCommand{
Id: invokeTaskId(r.CreatePromiseAndTask.Task.PromiseId),
Recv: nil,
Mesg: &message.Mesg{Type: message.Invoke, Root: r.CreatePromiseAndTask.Task.PromiseId, Leaf: r.CreatePromiseAndTask.Task.PromiseId},
Timeout: r.CreatePromiseAndTask.Task.Timeout,
Expand Down Expand Up @@ -93,8 +95,14 @@ func createPromiseAndTask(
if err != nil {
return nil, err
}
var promiseRowsAffected int64
if taskCmd == nil {
promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected
} else {
promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected
}

if completion.Store.Results[0].CreatePromise.RowsAffected == 0 {
if promiseRowsAffected == 0 {
// It's possible that the promise was created by another coroutine
// while we were creating. In that case, we should just retry.
return createPromiseAndTask(c, r, createPromiseReq, taskCmd)
Expand All @@ -117,20 +125,21 @@ func createPromiseAndTask(
switch r.Kind {
case t_api.CreatePromiseAndTask:
util.Assert(taskCmd != nil, "create task cmd must not be nil")
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateTask, "completion must be create task")
util.Assert(completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask, "completion must be createPromiseAndTask")

t = &task.Task{
Id: completion.Store.Results[1].CreateTask.LastInsertId,
ProcessId: taskCmd.ProcessId,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
Id: taskCmd.Id,
ProcessId: taskCmd.ProcessId,
RootPromiseId: p.Id,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
}
}
} else {
Expand Down Expand Up @@ -203,12 +212,26 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
promiseCmd.Tags = map[string]string{}
}

isCreatePromiseAndTask := taskCmd != nil

return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
// add create promise command
commands := []*t_aio.Command{{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
}}
commands := []*t_aio.Command{}

// Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd
if isCreatePromiseAndTask {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
PromiseCommand: promiseCmd,
TaskCommand: taskCmd,
},
})
} else {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
})
}

// check router to see if a task needs to be created
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Expand All @@ -231,33 +254,36 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
}

if taskCmd != nil && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise when creating a task", "cmd", promiseCmd)
if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd)
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
}

if err == nil && completion.Router.Matched {
util.Assert(completion.Router.Recv != nil, "recv must not be nil")

// If there is a taskCmd just update the Recv otherwise create a tasks for the match
if taskCmd != nil {
if isCreatePromiseAndTask {
// Note: we are mutating the taskCmd that is already merged with the createPromiseCmd
taskCmd.Recv = completion.Router.Recv
} else {
taskCmd = &t_aio.CreateTaskCommand{
Id: invokeTaskId(promiseCmd.Id),
Recv: completion.Router.Recv,
Mesg: &message.Mesg{Type: message.Invoke, Root: promiseCmd.Id, Leaf: promiseCmd.Id},
Timeout: promiseCmd.Timeout,
State: task.Init,
CreatedOn: promiseCmd.CreatedOn,
}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})

}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})
}

// add additional commands
Expand All @@ -281,8 +307,21 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
util.Assert(completion.Store.Results[0].CreatePromise.RowsAffected == 0 || completion.Store.Results[0].CreatePromise.RowsAffected == 1, "result must return 0 or 1 rows")
if isCreatePromiseAndTask {
promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask
util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows")
if promiseAndTaskResult.PromiseRowsAffected == 0 {
util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created")
}
} else {
createPromiseResult := completion.Store.Results[0].CreatePromise
util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows")
}

return completion, nil
}
}

func invokeTaskId(promiseId string) string {
return fmt.Sprintf("__invoke:%s", promiseId)
}
6 changes: 5 additions & 1 deletion internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
callbackId := subscriptionId(r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func subscriptionId(promiseId, customId string) string {
return fmt.Sprintf("__notify:%s:%s", promiseId, customId)
}
12 changes: 11 additions & 1 deletion internal/app/subsystems/aio/sender/sender_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/pkg/message"
)

// Config
Expand Down Expand Up @@ -52,8 +53,17 @@ func (s *SenderDST) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Completion])
for i, sqe := range sqes {
var completion *t_aio.SenderCompletion

mesgType := sqe.Submission.Sender.Task.Mesg.Type

var obj any
if mesgType == message.Notify {
obj = sqe.Submission.Sender.Promise
} else {
obj = sqe.Submission.Sender.Task
}

select {
case s.backchannel <- sqe.Submission.Sender.Task:
case s.backchannel <- obj:
completion = &t_aio.SenderCompletion{
Success: s.r.Float64() < s.config.P,
}
Expand Down
Loading

0 comments on commit f188e8b

Please sign in to comment.