Skip to content

Commit

Permalink
Avillega/deduplicate callbacks (#478)
Browse files Browse the repository at this point in the history
Add callback id to requests to create callbacks, this allows us to 
deduplicate callbacks for the same promises.
  • Loading branch information
avillega authored Dec 10, 2024
1 parent 95c4887 commit 1f1a419
Show file tree
Hide file tree
Showing 30 changed files with 682 additions and 1,100 deletions.
3 changes: 1 addition & 2 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dst

import (
"errors"
"fmt"
"log/slog"
"math/rand" // nosemgrep
Expand Down Expand Up @@ -195,7 +194,7 @@ func RunDSTCmd() *cobra.Command {
}

if !ok {
return errors.New("DST failed")
return fmt.Errorf("DST failed for seed='%d'", seed)
}

return nil
Expand Down
55 changes: 27 additions & 28 deletions internal/app/coroutines/createCallback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
Expand Down Expand Up @@ -52,6 +53,10 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

// If the callback was already created return 200 and an empty callback
var cb *callback.Callback
status := t_api.StatusOK

if p.State == promise.Pending {
mesg := &message.Mesg{
Type: message.Resume,
Expand All @@ -61,6 +66,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateCallback.PromiseId, r.CreateCallback.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand All @@ -70,6 +76,7 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Expand All @@ -95,37 +102,29 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
res = &t_api.Response{
Kind: t_api.CreateCallback,
Tags: r.Tags,
CreateCallback: &t_api.CreateCallbackResponse{
Status: t_api.StatusCreated,
Callback: &callback.Callback{
Id: result.LastInsertId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Timeout: r.CreateCallback.Timeout,
CreatedOn: createdOn,
},
Promise: p,
},
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
PromiseId: r.CreateCallback.PromiseId,
Recv: r.CreateCallback.Recv,
Mesg: mesg,
Timeout: r.CreateCallback.Timeout,
CreatedOn: createdOn,
}
} else {
return CreateCallback(c, r)
}
} else {
res = &t_api.Response{
Kind: t_api.CreateCallback,
Tags: r.Tags,
CreateCallback: &t_api.CreateCallbackResponse{
// ok indicates that the promise is completed and the process
// may continue
Status: t_api.StatusOK,
Promise: p,
},
}
}

res = &t_api.Response{
Kind: t_api.CreateCallback,
Tags: r.Tags,
CreateCallback: &t_api.CreateCallbackResponse{
// Status could be StatusOk or StatusCreated if the Callback Id was already present
Status: status,
Callback: cb,
Promise: p,
},
}

} else {
res = &t_api.Response{
Kind: t_api.CreateCallback,
Expand Down
18 changes: 11 additions & 7 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
Expand Down Expand Up @@ -46,6 +47,7 @@ func CreatePromiseAndCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Compl
return createPromiseAndTaskOrCallback(c, r, r.CreatePromiseAndCallback.Promise, &t_aio.Command{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: fmt.Sprintf("%s.%s", r.CreatePromiseAndCallback.Promise.Id, r.CreatePromiseAndCallback.Callback.Id),
PromiseId: r.CreatePromiseAndCallback.Callback.PromiseId,
Recv: r.CreatePromiseAndCallback.Callback.Recv,
Mesg: &message.Mesg{Type: message.Resume, Root: r.CreatePromiseAndCallback.Callback.RootPromiseId, Leaf: r.CreatePromiseAndCallback.Callback.PromiseId},
Expand Down Expand Up @@ -159,13 +161,15 @@ func createPromiseAndTaskOrCallback(
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateCallback, "completion must be create callback")
cmd := additionalCmds[0].CreateCallback

cb = &callback.Callback{
Id: completion.Store.Results[1].CreateCallback.LastInsertId,
PromiseId: cmd.PromiseId,
Recv: cmd.Recv,
Mesg: cmd.Mesg,
Timeout: cmd.Timeout,
CreatedOn: cmd.CreatedOn,
if completion.Store.Results[1].CreateCallback.RowsAffected != 0 {
cb = &callback.Callback{
Id: cmd.Id,
PromiseId: cmd.PromiseId,
Recv: cmd.Recv,
Mesg: cmd.Mesg,
Timeout: cmd.Timeout,
CreatedOn: cmd.CreatedOn,
}
}
}
} else {
Expand Down
41 changes: 24 additions & 17 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id);
CREATE TABLE IF NOT EXISTS callbacks (
id SERIAL PRIMARY KEY,
id TEXT UNIQUE,
promise_id TEXT,
root_promise_id TEXT,
recv BYTEA,
Expand Down Expand Up @@ -174,12 +174,13 @@ const (

CALLBACK_INSERT_STATEMENT = `
INSERT INTO callbacks
(promise_id, root_promise_id, recv, mesg, timeout, created_on)
(id, promise_id, root_promise_id, recv, mesg, timeout, created_on)
SELECT
$1, $2, $3, $4, $5, $6
$1, $2, $3, $4, $5, $6, $7
WHERE EXISTS
(SELECT 1 FROM promises WHERE id = $1 AND state = 1)
RETURNING id`
(SELECT 1 FROM promises WHERE id = $2 AND state = 1)
AND NOT EXISTS
(SELECT 1 FROM callbacks WHERE id = $1)`

CALLBACK_DELETE_STATEMENT = `
DELETE FROM callbacks WHERE promise_id = $1`
Expand Down Expand Up @@ -578,6 +579,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
// Lazily defined prepared statements
var promiseInsertStmt *sql.Stmt
var promiseUpdateStmt *sql.Stmt
var callbackInsertStmt *sql.Stmt
var callbackDeleteStmt *sql.Stmt
var scheduleInsertStmt *sql.Stmt
var scheduleUpdateStmt *sql.Stmt
Expand Down Expand Up @@ -637,8 +639,16 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.

// Callbacks
case t_aio.CreateCallback:
if callbackInsertStmt == nil {
callbackInsertStmt, err = tx.Prepare(CALLBACK_INSERT_STATEMENT)
if err != nil {
return nil, err
}
defer callbackInsertStmt.Close()
}

util.Assert(command.CreateCallback != nil, "command must not be nil")
results[i][j], err = w.createCallback(tx, command.CreateCallback)
results[i][j], err = w.createCallback(tx, callbackInsertStmt, command.CreateCallback)
case t_aio.DeleteCallbacks:
if callbackDeleteStmt == nil {
callbackDeleteStmt, err = tx.Prepare(CALLBACK_DELETE_STATEMENT)
Expand Down Expand Up @@ -1050,7 +1060,7 @@ func (w *PostgresStoreWorker) updatePromise(tx *sql.Tx, stmt *sql.Stmt, cmd *t_a

// Callbacks

func (w *PostgresStoreWorker) createCallback(tx *sql.Tx, cmd *t_aio.CreateCallbackCommand) (*t_aio.Result, error) {
func (w *PostgresStoreWorker) createCallback(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateCallbackCommand) (*t_aio.Result, error) {
util.Assert(cmd.Recv != nil, "recv must not be nil")
util.Assert(cmd.Mesg != nil, "mesg must not be nil")

Expand All @@ -1059,23 +1069,20 @@ func (w *PostgresStoreWorker) createCallback(tx *sql.Tx, cmd *t_aio.CreateCallba
return nil, err
}

var lastInsertId string
rowsAffected := int64(1)
row := tx.QueryRow(CALLBACK_INSERT_STATEMENT, cmd.PromiseId, cmd.Mesg.Root, cmd.Recv, mesg, cmd.Timeout, cmd.CreatedOn)
res, err := stmt.Exec(cmd.Id, cmd.PromiseId, cmd.Mesg.Root, cmd.Recv, mesg, cmd.Timeout, cmd.CreatedOn)
if err != nil {
return nil, err
}

if err := row.Scan(&lastInsertId); err != nil {
if err == sql.ErrNoRows {
rowsAffected = 0
} else {
return nil, err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

return &t_aio.Result{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.AlterCallbacksResult{
RowsAffected: rowsAffected,
LastInsertId: lastInsertId,
},
}, nil
}
Expand Down
34 changes: 18 additions & 16 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id);
CREATE TABLE IF NOT EXISTS callbacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT UNIQUE,
promise_id TEXT,
root_promise_id TEXT,
recv BLOB,
Expand Down Expand Up @@ -162,11 +162,13 @@ const (

CALLBACK_INSERT_STATEMENT = `
INSERT INTO callbacks
(promise_id, root_promise_id, recv, mesg, timeout, created_on)
(id, promise_id, root_promise_id, recv, mesg, timeout, created_on)
SELECT
?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?
WHERE EXISTS
(SELECT 1 FROM promises WHERE id = ? AND state = 1)`
(SELECT 1 FROM promises WHERE id = ? AND state = 1)
AND NOT EXISTS
(SELECT 1 FROM callbacks WHERE id = ?)`

CALLBACK_DELETE_STATEMENT = `
DELETE FROM callbacks WHERE promise_id = ?`
Expand Down Expand Up @@ -1035,31 +1037,31 @@ func (w *SqliteStoreWorker) createCallback(tx *sql.Tx, stmt *sql.Stmt, cmd *t_ai
return nil, err
}

res, err := stmt.Exec(cmd.PromiseId, cmd.Mesg.Root, cmd.Recv, mesg, cmd.Timeout, cmd.CreatedOn, cmd.PromiseId)
if err != nil {
return nil, err
}
res, err := stmt.Exec(
cmd.Id,
cmd.PromiseId,
cmd.Mesg.Root,
cmd.Recv,
mesg,
cmd.Timeout,
cmd.CreatedOn,
cmd.PromiseId,
cmd.Id,
)

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

lastInsertId, err := res.LastInsertId()
rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

var lastInsertIdStr string
if rowsAffected != 0 {
lastInsertIdStr = strconv.FormatInt(lastInsertId, 10)
}

return &t_aio.Result{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.AlterCallbacksResult{
RowsAffected: rowsAffected,
LastInsertId: lastInsertIdStr,
},
}, nil
}
Expand Down
Loading

0 comments on commit 1f1a419

Please sign in to comment.