Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DST horizontal partitioning #556

Merged
merged 13 commits into from
Mar 11, 2025
4 changes: 4 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
scenario string
visualizationPath string
verbose bool
printOps bool

Check warning on line 36 in cmd/dst/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/dst/run.go#L36

Added line #L36 was not covered by tests

reqsPerTick = util.NewRangeIntFlag(1, 25)
ids = util.NewRangeIntFlag(1, 25)
Expand Down Expand Up @@ -170,6 +171,7 @@
Timeout: timeout,
VisualizationPath: visualizationPath,
Verbose: verbose,
PrintOps: printOps,

Check warning on line 174 in cmd/dst/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/dst/run.go#L174

Added line #L174 was not covered by tests
TimeElapsedPerTick: 1000, // ms
TimeoutTicks: t,
ReqsPerTick: func() int { return reqsPerTick.Resolve(r) },
Expand Down Expand Up @@ -210,6 +212,7 @@
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: default, fault, lazy")
cmd.Flags().StringVar(&visualizationPath, "visualization-path", "dst.html", "porcupine visualization file path")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "log additional information when run is non linearizable")
cmd.Flags().BoolVar(&printOps, "print-ops", true, "log the request/response pairs of a run.")

Check warning on line 215 in cmd/dst/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/dst/run.go#L215

Added line #L215 was not covered by tests
cmd.Flags().Var(reqsPerTick, "reqs-per-tick", "number of requests per tick")
cmd.Flags().Var(ids, "ids", "promise id set size")
cmd.Flags().Var(idempotencyKeys, "idempotency-keys", "idempotency key set size")
Expand All @@ -220,6 +223,7 @@

// bind config
_ = config.BindDST(cmd)
cmd.SilenceUsage = true

Check warning on line 226 in cmd/dst/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/dst/run.go#L226

Added line #L226 was not covered by tests

cmd.Flags().SortFlags = false

Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/completeTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}

if t.State == task.Completed || t.State == task.Timedout {
status = t_api.StatusTaskAlreadyCompleted
status = t_api.StatusOK

Check warning on line 50 in internal/app/coroutines/completeTask.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/completeTask.go#L50

Added line #L50 was not covered by tests
} else if t.State == task.Init || t.State == task.Enqueued {
status = t_api.StatusTaskInvalidState
} else if t.Counter != r.CompleteTask.Counter {
Expand Down
10 changes: 7 additions & 3 deletions internal/app/coroutines/createCallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateCallback.PromiseId, r.CreateCallback.Id)
cbId := callbackId(r.CreateCallback.RootPromiseId, r.CreateCallback.PromiseId)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand All @@ -76,7 +76,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -104,7 +104,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
if result.RowsAffected == 1 {
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func callbackId(rootPromiseId, promiseId string) string {
return fmt.Sprintf("__resume:%s:%s", rootPromiseId, promiseId)
}
93 changes: 66 additions & 27 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
Expand All @@ -24,6 +25,7 @@ func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completio
util.Assert(r.CreatePromiseAndTask.Promise.Timeout == r.CreatePromiseAndTask.Task.Timeout, "timeouts must match")

return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.CreateTaskCommand{
Id: invokeTaskId(r.CreatePromiseAndTask.Task.PromiseId),
Recv: nil,
Mesg: &message.Mesg{Type: message.Invoke, Root: r.CreatePromiseAndTask.Task.PromiseId, Leaf: r.CreatePromiseAndTask.Task.PromiseId},
Timeout: r.CreatePromiseAndTask.Task.Timeout,
Expand Down Expand Up @@ -93,8 +95,14 @@ func createPromiseAndTask(
if err != nil {
return nil, err
}
var promiseRowsAffected int64
if taskCmd == nil {
promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected
} else {
promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected
}

if completion.Store.Results[0].CreatePromise.RowsAffected == 0 {
if promiseRowsAffected == 0 {
// It's possible that the promise was created by another coroutine
// while we were creating. In that case, we should just retry.
return createPromiseAndTask(c, r, createPromiseReq, taskCmd)
Expand All @@ -117,20 +125,21 @@ func createPromiseAndTask(
switch r.Kind {
case t_api.CreatePromiseAndTask:
util.Assert(taskCmd != nil, "create task cmd must not be nil")
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateTask, "completion must be create task")
util.Assert(completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask, "completion must be createPromiseAndTask")

t = &task.Task{
Id: completion.Store.Results[1].CreateTask.LastInsertId,
ProcessId: taskCmd.ProcessId,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
Id: taskCmd.Id,
ProcessId: taskCmd.ProcessId,
RootPromiseId: p.Id,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
}
}
} else {
Expand Down Expand Up @@ -203,12 +212,26 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
promiseCmd.Tags = map[string]string{}
}

isCreatePromiseAndTask := taskCmd != nil

return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
// add create promise command
commands := []*t_aio.Command{{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
}}
commands := []*t_aio.Command{}

// Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd
if isCreatePromiseAndTask {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
PromiseCommand: promiseCmd,
TaskCommand: taskCmd,
},
})
} else {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
})
}

// check router to see if a task needs to be created
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Expand All @@ -231,33 +254,36 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
}

if taskCmd != nil && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise when creating a task", "cmd", promiseCmd)
if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd)
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
}

if err == nil && completion.Router.Matched {
util.Assert(completion.Router.Recv != nil, "recv must not be nil")

// If there is a taskCmd just update the Recv otherwise create a tasks for the match
if taskCmd != nil {
if isCreatePromiseAndTask {
// Note: we are mutating the taskCmd that is already merged with the createPromiseCmd
taskCmd.Recv = completion.Router.Recv
} else {
taskCmd = &t_aio.CreateTaskCommand{
Id: invokeTaskId(promiseCmd.Id),
Recv: completion.Router.Recv,
Mesg: &message.Mesg{Type: message.Invoke, Root: promiseCmd.Id, Leaf: promiseCmd.Id},
Timeout: promiseCmd.Timeout,
State: task.Init,
CreatedOn: promiseCmd.CreatedOn,
}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})

}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})
}

// add additional commands
Expand All @@ -281,8 +307,21 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
util.Assert(completion.Store.Results[0].CreatePromise.RowsAffected == 0 || completion.Store.Results[0].CreatePromise.RowsAffected == 1, "result must return 0 or 1 rows")
if isCreatePromiseAndTask {
promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask
util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows")
if promiseAndTaskResult.PromiseRowsAffected == 0 {
util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created")
}
} else {
createPromiseResult := completion.Store.Results[0].CreatePromise
util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows")
}
Comment on lines +310 to +319
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general note, I might want to come back to this code and refactor it in a cleaner way. the isCreatePromiseAndTask every where makes me believe there is a cleaner approach to it.


return completion, nil
}
}

func invokeTaskId(promiseId string) string {
return fmt.Sprintf("__invoke:%s", promiseId)
}
6 changes: 5 additions & 1 deletion internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
callbackId := subscriptionId(r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func subscriptionId(promiseId, customId string) string {
return fmt.Sprintf("__notify:%s:%s", promiseId, customId)
}
12 changes: 11 additions & 1 deletion internal/app/subsystems/aio/sender/sender_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/pkg/message"
)

// Config
Expand Down Expand Up @@ -52,8 +53,17 @@
for i, sqe := range sqes {
var completion *t_aio.SenderCompletion

mesgType := sqe.Submission.Sender.Task.Mesg.Type

var obj any
if mesgType == message.Notify {
obj = sqe.Submission.Sender.Promise
} else {
obj = sqe.Submission.Sender.Task
}

Check warning on line 63 in internal/app/subsystems/aio/sender/sender_dst.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender_dst.go#L56-L63

Added lines #L56 - L63 were not covered by tests

select {
case s.backchannel <- sqe.Submission.Sender.Task:
case s.backchannel <- obj:

Check warning on line 66 in internal/app/subsystems/aio/sender/sender_dst.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/sender/sender_dst.go#L66

Added line #L66 was not covered by tests
completion = &t_aio.SenderCompletion{
Success: s.r.Float64() < s.config.P,
}
Expand Down
Loading
Loading