Skip to content

Commit

Permalink
Should we drop support for postgres?
Browse files Browse the repository at this point in the history
  • Loading branch information
avillega committed Mar 7, 2025
1 parent c913c41 commit 0199beb
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
var lockHeartbeatStmt *sql.Stmt
var lockTimeoutStmt *sql.Stmt
var tasksInsertStmt *sql.Stmt
var taskInsertStmt *sql.Stmt
var taskUpdateStmt *sql.Stmt
var tasksCompleteStmt *sql.Stmt
var taskHeartbeatStmt *sql.Stmt
Expand Down Expand Up @@ -767,8 +768,16 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
util.Assert(command.ReadEnquableTasks != nil, "command must not be nil")
results[i][j], err = w.readEnqueueableTasks(tx, command.ReadEnquableTasks)
case t_aio.CreateTask:
if taskInsertStmt == nil {
taskInsertStmt, err = tx.Prepare(TASK_INSERT_STATEMENT)
if err != nil {
return nil, err
}
defer taskInsertStmt.Close()
}

util.Assert(command.CreateTask != nil, "command must not be nil")
results[i][j], err = w.createTask(tx, command.CreateTask)
results[i][j], err = w.createTask(tx, taskInsertStmt, command.CreateTask)
case t_aio.CreateTasks:
if tasksInsertStmt == nil {
tasksInsertStmt, err = tx.Prepare(TASK_INSERT_ALL_STATEMENT)
Expand Down Expand Up @@ -823,8 +832,16 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
defer promiseInsertStmt.Close()
}

if taskInsertStmt == nil {
taskInsertStmt, err = tx.Prepare(TASK_INSERT_STATEMENT)
if err != nil {
return nil, err
}
defer taskInsertStmt.Close()
}

util.Assert(command.CreatePromiseAndTask != nil, "createPromiseAndTask command must bot be nil")
results[i][j], err = w.createPromiseAndTask(tx, promiseInsertStmt, command.CreatePromiseAndTask)
results[i][j], err = w.createPromiseAndTask(tx, promiseInsertStmt, taskInsertStmt, command.CreatePromiseAndTask)

default:
panic(fmt.Sprintf("invalid command: %s", command.Kind.String()))
Expand Down Expand Up @@ -1043,7 +1060,7 @@ func (w *PostgresStoreWorker) createPromise(_ *sql.Tx, stmt *sql.Stmt, cmd *t_ai
}, nil
}

func (w *PostgresStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.Stmt, cmd *t_aio.CreatePromiseAndTaskCommand) (*t_aio.Result, error) {
func (w *PostgresStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.Stmt, taskStmt *sql.Stmt, cmd *t_aio.CreatePromiseAndTaskCommand) (*t_aio.Result, error) {
promiseResult, err := w.createPromise(tx, promiseStmt, cmd.PromiseCommand)
if err != nil {
return nil, err
Expand All @@ -1057,7 +1074,7 @@ func (w *PostgresStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.
}, nil
}

taskResult, err := w.createTask(tx, cmd.TaskCommand)
taskResult, err := w.createTask(tx, taskStmt, cmd.TaskCommand)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1626,7 +1643,7 @@ func (w *PostgresStoreWorker) readEnqueueableTasks(tx *sql.Tx, cmd *t_aio.ReadEn
}, nil
}

func (w *PostgresStoreWorker) createTask(tx *sql.Tx, cmd *t_aio.CreateTaskCommand) (*t_aio.Result, error) {
func (w *PostgresStoreWorker) createTask(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.CreateTaskCommand) (*t_aio.Result, error) {
util.Assert(cmd.Recv != nil, "recv must not be nil")
util.Assert(cmd.Mesg != nil, "mesg must not be nil")
util.Assert(cmd.State.In(task.Init|task.Claimed), "state must be init or claimed")
Expand All @@ -1637,16 +1654,15 @@ func (w *PostgresStoreWorker) createTask(tx *sql.Tx, cmd *t_aio.CreateTaskComman
return nil, store.StoreErr(err)
}

var lastInsertId string
rowsAffected := int64(1)
row := tx.QueryRow(TASK_INSERT_STATEMENT, cmd.Id, cmd.Recv, mesg, cmd.Timeout, cmd.ProcessId, cmd.State, cmd.Mesg.Root, cmd.Ttl, cmd.ExpiresAt, cmd.CreatedOn)
// insert
res, err := stmt.Exec(cmd.Id, cmd.Recv, mesg, cmd.Timeout, cmd.ProcessId, cmd.State, cmd.Mesg.Root, cmd.Ttl, cmd.ExpiresAt, cmd.CreatedOn)
if err != nil {
return nil, err
}

if err := row.Scan(&lastInsertId); err != nil {
if err == sql.ErrNoRows {
rowsAffected = 0
} else {
return nil, store.StoreErr(err)
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return nil, err
}

return &t_aio.Result{
Expand Down

0 comments on commit 0199beb

Please sign in to comment.