From eba2e6a47830cdb3dcd172857e83edfadaf2c37f Mon Sep 17 00:00:00 2001 From: David Farr Date: Thu, 9 Jan 2025 13:55:20 -0800 Subject: [PATCH] Pad ids with zeros to help with lexographical ordering in dst (#511) --- internal/app/coroutines/enqueueTasks.go | 2 +- .../subsystems/aio/store/postgres/postgres.go | 10 +++++----- .../app/subsystems/aio/store/sqlite/sqlite.go | 10 +++++----- internal/app/subsystems/aio/store/test/cases.go | 6 +++--- internal/kernel/t_aio/store.go | 16 ++++++++-------- test/dst/dst.go | 2 +- test/dst/generator.go | 6 +++++- 7 files changed, 28 insertions(+), 24 deletions(-) diff --git a/internal/app/coroutines/enqueueTasks.go b/internal/app/coroutines/enqueueTasks.go index 96bfc21b..e9bc7855 100644 --- a/internal/app/coroutines/enqueueTasks.go +++ b/internal/app/coroutines/enqueueTasks.go @@ -42,7 +42,7 @@ func EnqueueTasks(config *system.Config, tags map[string]string) gocoro.Coroutin util.Assert(completion.Store != nil, "completion must not be nil") util.Assert(len(completion.Store.Results) == 1, "completion must have one result") - result := completion.Store.Results[0].ReadEnquableTasks + result := completion.Store.Results[0].ReadEnqueueableTasks util.Assert(result != nil, "result must not be nil") commands := []*t_aio.Command{} diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 60682419..3555ce20 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -300,7 +300,7 @@ const ( ORDER BY root_promise_id, id LIMIT $3` - TASK_SELECT_ENQUABLE_STATEMENT = ` + TASK_SELECT_ENQUEUEABLE_STATEMENT = ` SELECT DISTINCT ON (root_promise_id) id, process_id, @@ -763,7 +763,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio. results[i][j], err = w.readTasks(tx, command.ReadTasks) case t_aio.ReadEnqueueableTasks: util.Assert(command.ReadEnquableTasks != nil, "command must not be nil") - results[i][j], err = w.readEnquableTasks(tx, command.ReadEnquableTasks) + results[i][j], err = w.readEnqueueableTasks(tx, command.ReadEnquableTasks) case t_aio.CreateTask: util.Assert(command.CreateTask != nil, "command must not be nil") results[i][j], err = w.createTask(tx, command.CreateTask) @@ -1541,8 +1541,8 @@ func (w *PostgresStoreWorker) readTasks(tx *sql.Tx, cmd *t_aio.ReadTasksCommand) }, nil } -func (w *PostgresStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) { - rows, err := tx.Query(TASK_SELECT_ENQUABLE_STATEMENT, cmd.Limit) +func (w *PostgresStoreWorker) readEnqueueableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) { + rows, err := tx.Query(TASK_SELECT_ENQUEUEABLE_STATEMENT, cmd.Limit) if err != nil { return nil, store.StoreErr(err) } @@ -1577,7 +1577,7 @@ func (w *PostgresStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnque return &t_aio.Result{ Kind: t_aio.ReadEnqueueableTasks, - ReadEnquableTasks: &t_aio.QueryTasksResult{ + ReadEnqueueableTasks: &t_aio.QueryTasksResult{ RowsReturned: rowsReturned, Records: records, }, diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 585ce027..bef36638 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -289,7 +289,7 @@ const ( ORDER BY root_promise_id, id LIMIT ?` - TASK_SELECT_ENQUABLE_STATEMENT = ` + TASK_SELECT_ENQUEUEABLE_STATEMENT = ` SELECT id, process_id, @@ -719,7 +719,7 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Tr results[i][j], err = w.readTasks(tx, command.ReadTasks) case t_aio.ReadEnqueueableTasks: util.Assert(command.ReadEnquableTasks != nil, "command must not be nil") - results[i][j], err = w.readEnquableTasks(tx, command.ReadEnquableTasks) + results[i][j], err = w.readEnqueueableTasks(tx, command.ReadEnquableTasks) case t_aio.CreateTask: if taskInsertStmt == nil { taskInsertStmt, err = tx.Prepare(TASK_INSERT_STATEMENT) @@ -1532,8 +1532,8 @@ func (w *SqliteStoreWorker) readTasks(tx *sql.Tx, cmd *t_aio.ReadTasksCommand) ( }, nil } -func (w *SqliteStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) { - rows, err := tx.Query(TASK_SELECT_ENQUABLE_STATEMENT, cmd.Limit) +func (w *SqliteStoreWorker) readEnqueueableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) { + rows, err := tx.Query(TASK_SELECT_ENQUEUEABLE_STATEMENT, cmd.Limit) if err != nil { return nil, store.StoreErr(err) } @@ -1568,7 +1568,7 @@ func (w *SqliteStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueue return &t_aio.Result{ Kind: t_aio.ReadEnqueueableTasks, - ReadEnquableTasks: &t_aio.QueryTasksResult{ + ReadEnqueueableTasks: &t_aio.QueryTasksResult{ RowsReturned: rowsReturned, Records: records, }, diff --git a/internal/app/subsystems/aio/store/test/cases.go b/internal/app/subsystems/aio/store/test/cases.go index 759a41c5..ad2432c2 100644 --- a/internal/app/subsystems/aio/store/test/cases.go +++ b/internal/app/subsystems/aio/store/test/cases.go @@ -3241,7 +3241,7 @@ var TestCases = []*testCase{ }, { Kind: t_aio.ReadEnqueueableTasks, - ReadEnquableTasks: &t_aio.QueryTasksResult{ + ReadEnqueueableTasks: &t_aio.QueryTasksResult{ RowsReturned: 2, Records: []*task.TaskRecord{ { @@ -3273,7 +3273,7 @@ var TestCases = []*testCase{ }, { Kind: t_aio.ReadEnqueueableTasks, - ReadEnquableTasks: &t_aio.QueryTasksResult{ + ReadEnqueueableTasks: &t_aio.QueryTasksResult{ RowsReturned: 1, Records: []*task.TaskRecord{ { @@ -3296,7 +3296,7 @@ var TestCases = []*testCase{ }, { Kind: t_aio.ReadEnqueueableTasks, - ReadEnquableTasks: &t_aio.QueryTasksResult{ + ReadEnqueueableTasks: &t_aio.QueryTasksResult{ RowsReturned: 0, }, }, diff --git a/internal/kernel/t_aio/store.go b/internal/kernel/t_aio/store.go index bb54cf76..9b056cec 100644 --- a/internal/kernel/t_aio/store.go +++ b/internal/kernel/t_aio/store.go @@ -203,14 +203,14 @@ type Result struct { DeleteSchedule *AlterSchedulesResult // TASKS - ReadTask *QueryTasksResult - ReadTasks *QueryTasksResult - ReadEnquableTasks *QueryTasksResult - CreateTask *AlterTasksResult - CreateTasks *AlterTasksResult - CompleteTasks *AlterTasksResult - UpdateTask *AlterTasksResult - HeartbeatTasks *AlterTasksResult + ReadTask *QueryTasksResult + ReadTasks *QueryTasksResult + ReadEnqueueableTasks *QueryTasksResult + CreateTask *AlterTasksResult + CreateTasks *AlterTasksResult + CompleteTasks *AlterTasksResult + UpdateTask *AlterTasksResult + HeartbeatTasks *AlterTasksResult // LOCKS ReadLock *QueryLocksResult diff --git a/test/dst/dst.go b/test/dst/dst.go index 9c745cff..4fa6aa5d 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -226,7 +226,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System) system.Tick(time) - // now read from the callback channel + // now read from the backchannel for len(d.config.Backchannel) > 0 { var bc *Backchannel obj := <-d.config.Backchannel diff --git a/test/dst/generator.go b/test/dst/generator.go index 5a550ace..98c85638 100644 --- a/test/dst/generator.go +++ b/test/dst/generator.go @@ -29,8 +29,12 @@ type RequestGenerator func(*rand.Rand, int64) *t_api.Request func NewGenerator(r *rand.Rand, config *Config) *Generator { idSet := make([]string, config.Ids) + width := len(strconv.Itoa(config.Ids)) + for i := 0; i < config.Ids; i++ { - idSet[i] = strconv.Itoa(i) + // pad ids with leading zeros to ensure ids are the same length + // this helps with lexigraphical sorting across different databases + idSet[i] = fmt.Sprintf(fmt.Sprintf("%%0%dd", width), i) } idempotencyKeySet := []*idempotency.Key{}