Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DST horizontal partitioning #556

Merged
merged 13 commits into from
Mar 11, 2025
4 changes: 4 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func RunDSTCmd() *cobra.Command {
scenario string
visualizationPath string
verbose bool
printOps bool

reqsPerTick = util.NewRangeIntFlag(1, 25)
ids = util.NewRangeIntFlag(1, 25)
Expand Down Expand Up @@ -170,6 +171,7 @@ func RunDSTCmd() *cobra.Command {
Timeout: timeout,
VisualizationPath: visualizationPath,
Verbose: verbose,
PrintOps: printOps,
TimeElapsedPerTick: 1000, // ms
TimeoutTicks: t,
ReqsPerTick: func() int { return reqsPerTick.Resolve(r) },
Expand Down Expand Up @@ -210,6 +212,7 @@ func RunDSTCmd() *cobra.Command {
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: default, fault, lazy")
cmd.Flags().StringVar(&visualizationPath, "visualization-path", "dst.html", "porcupine visualization file path")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "log additional information when run is non linearizable")
cmd.Flags().BoolVar(&printOps, "print-ops", true, "log the request/response pairs of a run.")
cmd.Flags().Var(reqsPerTick, "reqs-per-tick", "number of requests per tick")
cmd.Flags().Var(ids, "ids", "promise id set size")
cmd.Flags().Var(idempotencyKeys, "idempotency-keys", "idempotency key set size")
Expand All @@ -220,6 +223,7 @@ func RunDSTCmd() *cobra.Command {

// bind config
_ = config.BindDST(cmd)
cmd.SilenceUsage = true

cmd.Flags().SortFlags = false

Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/completeTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func CompleteTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
}

if t.State == task.Completed || t.State == task.Timedout {
status = t_api.StatusTaskAlreadyCompleted
status = t_api.StatusOK
} else if t.State == task.Init || t.State == task.Enqueued {
status = t_api.StatusTaskInvalidState
} else if t.Counter != r.CompleteTask.Counter {
Expand Down
10 changes: 7 additions & 3 deletions internal/app/coroutines/createCallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateCallback.PromiseId, r.CreateCallback.Id)
cbId := callbackId(r.CreateCallback.RootPromiseId, r.CreateCallback.PromiseId)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand All @@ -76,7 +76,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -104,7 +104,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
if result.RowsAffected == 1 {
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
Id: cbId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func callbackId(rootPromiseId, promiseId string) string {
return fmt.Sprintf("__resume:%s:%s", rootPromiseId, promiseId)
}
93 changes: 66 additions & 27 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
Expand All @@ -24,6 +25,7 @@ func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completio
util.Assert(r.CreatePromiseAndTask.Promise.Timeout == r.CreatePromiseAndTask.Task.Timeout, "timeouts must match")

return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.CreateTaskCommand{
Id: invokeTaskId(r.CreatePromiseAndTask.Task.PromiseId),
Recv: nil,
Mesg: &message.Mesg{Type: message.Invoke, Root: r.CreatePromiseAndTask.Task.PromiseId, Leaf: r.CreatePromiseAndTask.Task.PromiseId},
Timeout: r.CreatePromiseAndTask.Task.Timeout,
Expand Down Expand Up @@ -93,8 +95,14 @@ func createPromiseAndTask(
if err != nil {
return nil, err
}
var promiseRowsAffected int64
if taskCmd == nil {
promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected
} else {
promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected
}

if completion.Store.Results[0].CreatePromise.RowsAffected == 0 {
if promiseRowsAffected == 0 {
// It's possible that the promise was created by another coroutine
// while we were creating. In that case, we should just retry.
return createPromiseAndTask(c, r, createPromiseReq, taskCmd)
Expand All @@ -117,20 +125,21 @@ func createPromiseAndTask(
switch r.Kind {
case t_api.CreatePromiseAndTask:
util.Assert(taskCmd != nil, "create task cmd must not be nil")
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateTask, "completion must be create task")
util.Assert(completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask, "completion must be createPromiseAndTask")

t = &task.Task{
Id: completion.Store.Results[1].CreateTask.LastInsertId,
ProcessId: taskCmd.ProcessId,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
Id: taskCmd.Id,
ProcessId: taskCmd.ProcessId,
RootPromiseId: p.Id,
State: taskCmd.State,
Recv: taskCmd.Recv,
Mesg: taskCmd.Mesg,
Timeout: taskCmd.Timeout,
Counter: 1,
Attempt: 0,
Ttl: taskCmd.Ttl,
ExpiresAt: taskCmd.ExpiresAt,
CreatedOn: &taskCmd.CreatedOn,
}
}
} else {
Expand Down Expand Up @@ -203,12 +212,26 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
promiseCmd.Tags = map[string]string{}
}

isCreatePromiseAndTask := taskCmd != nil

return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
// add create promise command
commands := []*t_aio.Command{{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
}}
commands := []*t_aio.Command{}

// Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd
if isCreatePromiseAndTask {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
PromiseCommand: promiseCmd,
TaskCommand: taskCmd,
},
})
} else {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
})
}

// check router to see if a task needs to be created
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Expand All @@ -231,33 +254,36 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
}

if taskCmd != nil && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise when creating a task", "cmd", promiseCmd)
if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd)
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
}

if err == nil && completion.Router.Matched {
util.Assert(completion.Router.Recv != nil, "recv must not be nil")

// If there is a taskCmd just update the Recv otherwise create a tasks for the match
if taskCmd != nil {
if isCreatePromiseAndTask {
// Note: we are mutating the taskCmd that is already merged with the createPromiseCmd
taskCmd.Recv = completion.Router.Recv
} else {
taskCmd = &t_aio.CreateTaskCommand{
Id: invokeTaskId(promiseCmd.Id),
Recv: completion.Router.Recv,
Mesg: &message.Mesg{Type: message.Invoke, Root: promiseCmd.Id, Leaf: promiseCmd.Id},
Timeout: promiseCmd.Timeout,
State: task.Init,
CreatedOn: promiseCmd.CreatedOn,
}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})

}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})
}

// add additional commands
Expand All @@ -281,8 +307,21 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
util.Assert(completion.Store.Results[0].CreatePromise.RowsAffected == 0 || completion.Store.Results[0].CreatePromise.RowsAffected == 1, "result must return 0 or 1 rows")
if isCreatePromiseAndTask {
promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask
util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows")
if promiseAndTaskResult.PromiseRowsAffected == 0 {
util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created")
}
} else {
createPromiseResult := completion.Store.Results[0].CreatePromise
util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows")
}
Comment on lines +310 to +319
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general note, I might want to come back to this code and refactor it in a cleaner way. the isCreatePromiseAndTask every where makes me believe there is a cleaner approach to it.


return completion, nil
}
}

func invokeTaskId(promiseId string) string {
return fmt.Sprintf("__invoke:%s", promiseId)
}
6 changes: 5 additions & 1 deletion internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
callbackId := subscriptionId(r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand Down Expand Up @@ -138,3 +138,7 @@ func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion,
util.Assert(res != nil, "response must not be nil")
return res, nil
}

func subscriptionId(promiseId, customId string) string {
return fmt.Sprintf("__notify:%s:%s", promiseId, customId)
}
12 changes: 11 additions & 1 deletion internal/app/subsystems/aio/sender/sender_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/pkg/message"
)

// Config
Expand Down Expand Up @@ -52,8 +53,17 @@ func (s *SenderDST) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Completion])
for i, sqe := range sqes {
var completion *t_aio.SenderCompletion

mesgType := sqe.Submission.Sender.Task.Mesg.Type

var obj any
if mesgType == message.Notify {
obj = sqe.Submission.Sender.Promise
} else {
obj = sqe.Submission.Sender.Task
}

select {
case s.backchannel <- sqe.Submission.Sender.Task:
case s.backchannel <- obj:
completion = &t_aio.SenderCompletion{
Success: s.r.Float64() < s.config.P,
}
Expand Down
25 changes: 13 additions & 12 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const (
CREATE INDEX IF NOT EXISTS idx_locks_expires_at ON locks(expires_at);

CREATE TABLE IF NOT EXISTS tasks (
id SERIAL PRIMARY KEY,
id TEXT,
sort_id SERIAL,
process_id TEXT,
state INTEGER DEFAULT 1,
root_promise_id TEXT,
Expand All @@ -103,7 +104,8 @@ const (
ttl INTEGER DEFAULT 0,
expires_at BIGINT DEFAULT 0,
created_on BIGINT,
completed_on BIGINT
completed_on BIGINT,
PRIMARY KEY(id)
);

CREATE INDEX IF NOT EXISTS idx_tasks_process_id ON tasks(process_id);
Expand Down Expand Up @@ -297,7 +299,7 @@ const (
FROM tasks
WHERE
state & $1 != 0 AND (expires_at <= $2 OR timeout <= $2)
ORDER BY root_promise_id, id
ORDER BY root_promise_id, sort_id ASC
LIMIT $3`

TASK_SELECT_ENQUEUEABLE_STATEMENT = `
Expand All @@ -324,21 +326,21 @@ const (
WHERE t2.root_promise_id = t1.root_promise_id
AND t2.state in (2, 4) -- 2 -> Enqueue, 4 -> Claimed
)
ORDER BY root_promise_id, id
ORDER BY root_promise_id, sort_id ASC
LIMIT $1`

TASK_INSERT_STATEMENT = `
INSERT INTO tasks
(recv, mesg, timeout, process_id, state, root_promise_id, ttl, expires_at, created_on)
(id, recv, mesg, timeout, process_id, state, root_promise_id, ttl, expires_at, created_on)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id`
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT(id) DO NOTHING`

TASK_INSERT_ALL_STATEMENT = `
INSERT INTO tasks
(recv, mesg, timeout, root_promise_id, created_on)
(id, recv, mesg, timeout, root_promise_id, created_on)
SELECT
recv, mesg, timeout, root_promise_id, $1
id, recv, mesg, timeout, root_promise_id, $1
FROM
callbacks
WHERE
Expand Down Expand Up @@ -995,7 +997,7 @@ func (w *PostgresStoreWorker) searchPromises(tx *sql.Tx, cmd *t_aio.SearchPromis
}, nil
}

func (w *PostgresStoreWorker) createPromise(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreatePromiseCommand) (*t_aio.Result, error) {
func (w *PostgresStoreWorker) createPromise(_ *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreatePromiseCommand) (*t_aio.Result, error) {
util.Assert(cmd.Param.Headers != nil, "param headers must not be nil")
util.Assert(cmd.Param.Data != nil, "param data must not be nil")
util.Assert(cmd.Tags != nil, "tags must not be nil")
Expand Down Expand Up @@ -1597,7 +1599,7 @@ func (w *PostgresStoreWorker) createTask(tx *sql.Tx, cmd *t_aio.CreateTaskComman

var lastInsertId string
rowsAffected := int64(1)
row := tx.QueryRow(TASK_INSERT_STATEMENT, cmd.Recv, mesg, cmd.Timeout, cmd.ProcessId, cmd.State, cmd.Mesg.Root, cmd.Ttl, cmd.ExpiresAt, cmd.CreatedOn)
row := tx.QueryRow(TASK_INSERT_STATEMENT, cmd.Id, cmd.Recv, mesg, cmd.Timeout, cmd.ProcessId, cmd.State, cmd.Mesg.Root, cmd.Ttl, cmd.ExpiresAt, cmd.CreatedOn)

if err := row.Scan(&lastInsertId); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -1611,7 +1613,6 @@ func (w *PostgresStoreWorker) createTask(tx *sql.Tx, cmd *t_aio.CreateTaskComman
Kind: t_aio.CreateTask,
CreateTask: &t_aio.AlterTasksResult{
RowsAffected: rowsAffected,
LastInsertId: lastInsertId,
},
}, nil
}
Expand Down
Loading
Loading