Skip to content

Commit

Permalink
Complete tasks based on root promise (#470)
Browse files Browse the repository at this point in the history
* Add storage support to complete many tasks based on root promise

* Temporarily disable DST for tasks.

We are working on making DST faster, until that work gets completed
we need to disable validations over tasks in order to merge this PR.

* Make the linter happy
  • Loading branch information
avillega authored Dec 4, 2024
1 parent 16d3a16 commit 95c4887
Show file tree
Hide file tree
Showing 7 changed files with 775 additions and 14 deletions.
14 changes: 11 additions & 3 deletions internal/app/coroutines/completePromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func completePromise(tags map[string]string, cmd *t_aio.UpdatePromiseCommand, ad
Kind: t_aio.UpdatePromise,
UpdatePromise: cmd,
},
{
Kind: t_aio.CompleteTasks,
CompleteTasks: &t_aio.CompleteTasksCommand{
RootPromiseId: cmd.Id,
CompletedOn: cmd.CompletedOn,
},
},
{
Kind: t_aio.CreateTasks,
CreateTasks: &t_aio.CreateTasksCommand{
Expand Down Expand Up @@ -192,9 +199,10 @@ func completePromise(tags map[string]string, cmd *t_aio.UpdatePromiseCommand, ad
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
util.Assert(completion.Store.Results[0].UpdatePromise != nil, "result must not be nil")
util.Assert(completion.Store.Results[0].UpdatePromise.RowsAffected == 0 || completion.Store.Results[0].UpdatePromise.RowsAffected == 1, "result must return 0 or 1 rows")
util.Assert(completion.Store.Results[1].CreateTasks != nil, "result must not be nil")
util.Assert(completion.Store.Results[2].DeleteCallbacks != nil, "result must not be nil")
util.Assert(completion.Store.Results[1].CreateTasks.RowsAffected == completion.Store.Results[2].DeleteCallbacks.RowsAffected, "created rows must equal deleted rows")
util.Assert(completion.Store.Results[1].CompleteTasks != nil, "result must not be nil")
util.Assert(completion.Store.Results[2].CreateTasks != nil, "result must not be nil")
util.Assert(completion.Store.Results[3].DeleteCallbacks != nil, "result must not be nil")
util.Assert(completion.Store.Results[2].CreateTasks.RowsAffected == completion.Store.Results[3].DeleteCallbacks.RowsAffected, "created rows must equal deleted rows")

return completion.Store.Results[0].UpdatePromise.RowsAffected == 1, nil
}
Expand Down
39 changes: 39 additions & 0 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ const (
WHERE
id = $8 AND state & $9 != 0 AND counter = $10`

TASK_COMPLETE_BY_ROOT_ID_STATEMENT = `
UPDATE
tasks
SET
state = 8, completed_on = $1 -- State = 8 -> Completed
WHERE
root_promise_id = $2 AND state in (1, 2) -- State in (Init, Enqueued)`

TASK_HEARTBEAT_STATEMENT = `
UPDATE
tasks
Expand Down Expand Up @@ -580,6 +588,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
var lockTimeoutStmt *sql.Stmt
var tasksInsertStmt *sql.Stmt
var taskUpdateStmt *sql.Stmt
var tasksCompleteStmt *sql.Stmt
var taskHeartbeatStmt *sql.Stmt

// Results
Expand Down Expand Up @@ -770,6 +779,17 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.

util.Assert(command.UpdateTask != nil, "command must not be nil")
results[i][j], err = w.updateTask(tx, taskUpdateStmt, command.UpdateTask)
case t_aio.CompleteTasks:
if tasksCompleteStmt == nil {
tasksCompleteStmt, err = tx.Prepare(TASK_COMPLETE_BY_ROOT_ID_STATEMENT)
if err != nil {
return nil, err
}
defer tasksCompleteStmt.Close()
}

util.Assert(command.CompleteTasks != nil, "command must not be nil")
results[i][j], err = w.completeTasks(tx, tasksCompleteStmt, command.CompleteTasks)
case t_aio.HeartbeatTasks:
if taskHeartbeatStmt == nil {
taskHeartbeatStmt, err = tx.Prepare(TASK_HEARTBEAT_STATEMENT)
Expand Down Expand Up @@ -1608,6 +1628,25 @@ func (w *PostgresStoreWorker) createTasks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio
}, nil
}

func (w *PostgresStoreWorker) completeTasks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CompleteTasksCommand) (*t_aio.Result, error) {
res, err := stmt.Exec(cmd.CompletedOn, cmd.RootPromiseId)
if err != nil {
return nil, store.StoreErr(err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, store.StoreErr(err)
}

return &t_aio.Result{
Kind: t_aio.CompleteTasks,
CompleteTasks: &t_aio.AlterTasksResult{
RowsAffected: rowsAffected,
},
}, nil
}

func (w *PostgresStoreWorker) updateTask(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateTaskCommand) (*t_aio.Result, error) {
util.Assert(len(cmd.CurrentStates) > 0, "must provide at least one current state")

Expand Down
39 changes: 39 additions & 0 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ const (
WHERE
id = ? AND state & ? != 0 AND counter = ?`

TASK_COMPLETE_BY_ROOT_ID_STATEMENT = `
UPDATE
tasks
SET
state = 8, completed_on = ? -- State 8 -> Completed
WHERE
root_promise_id = ? AND state in (1, 2) -- State in (Init, Enqueued)`

TASK_HEARTBEAT_STATEMENT = `
UPDATE
tasks
Expand Down Expand Up @@ -535,6 +543,7 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Tr
var lockTimeoutStmt *sql.Stmt
var taskInsertStmt *sql.Stmt
var tasksInsertStmt *sql.Stmt
var tasksCompleteStmt *sql.Stmt
var taskUpdateStmt *sql.Stmt
var taskHeartbeatStmt *sql.Stmt

Expand Down Expand Up @@ -731,6 +740,17 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Tr

util.Assert(command.CreateTasks != nil, "command must not be nil")
results[i][j], err = w.createTasks(tx, tasksInsertStmt, command.CreateTasks)
case t_aio.CompleteTasks:
if tasksCompleteStmt == nil {
tasksCompleteStmt, err = tx.Prepare(TASK_COMPLETE_BY_ROOT_ID_STATEMENT)
if err != nil {
return nil, err
}
defer tasksCompleteStmt.Close()
}

util.Assert(command.CompleteTasks != nil, "command must not be nil")
results[i][j], err = w.completeTasks(tx, tasksCompleteStmt, command.CompleteTasks)
case t_aio.UpdateTask:
if taskUpdateStmt == nil {
taskUpdateStmt, err = tx.Prepare(TASK_UPDATE_STATEMENT)
Expand Down Expand Up @@ -1612,6 +1632,25 @@ func (w *SqliteStoreWorker) createTasks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.C
}, nil
}

func (w *SqliteStoreWorker) completeTasks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CompleteTasksCommand) (*t_aio.Result, error) {
res, err := stmt.Exec(cmd.CompletedOn, cmd.RootPromiseId)
if err != nil {
return nil, store.StoreErr(err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, store.StoreErr(err)
}

return &t_aio.Result{
Kind: t_aio.CompleteTasks,
CompleteTasks: &t_aio.AlterTasksResult{
RowsAffected: rowsAffected,
},
}, nil
}

func (w *SqliteStoreWorker) updateTask(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.UpdateTaskCommand) (*t_aio.Result, error) {
util.Assert(len(cmd.CurrentStates) > 0, "must provide at least one current state")

Expand Down
Loading

0 comments on commit 95c4887

Please sign in to comment.