Skip to content

Commit

Permalink
Pad ids with zeros to help with lexographical ordering in dst (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr authored Jan 9, 2025
1 parent 41d13d9 commit eba2e6a
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 24 deletions.
2 changes: 1 addition & 1 deletion internal/app/coroutines/enqueueTasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func EnqueueTasks(config *system.Config, tags map[string]string) gocoro.Coroutin
util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 1, "completion must have one result")

result := completion.Store.Results[0].ReadEnquableTasks
result := completion.Store.Results[0].ReadEnqueueableTasks
util.Assert(result != nil, "result must not be nil")

commands := []*t_aio.Command{}
Expand Down
10 changes: 5 additions & 5 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ const (
ORDER BY root_promise_id, id
LIMIT $3`

TASK_SELECT_ENQUABLE_STATEMENT = `
TASK_SELECT_ENQUEUEABLE_STATEMENT = `
SELECT DISTINCT ON (root_promise_id)
id,
process_id,
Expand Down Expand Up @@ -763,7 +763,7 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.
results[i][j], err = w.readTasks(tx, command.ReadTasks)
case t_aio.ReadEnqueueableTasks:
util.Assert(command.ReadEnquableTasks != nil, "command must not be nil")
results[i][j], err = w.readEnquableTasks(tx, command.ReadEnquableTasks)
results[i][j], err = w.readEnqueueableTasks(tx, command.ReadEnquableTasks)
case t_aio.CreateTask:
util.Assert(command.CreateTask != nil, "command must not be nil")
results[i][j], err = w.createTask(tx, command.CreateTask)
Expand Down Expand Up @@ -1541,8 +1541,8 @@ func (w *PostgresStoreWorker) readTasks(tx *sql.Tx, cmd *t_aio.ReadTasksCommand)
}, nil
}

func (w *PostgresStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) {
rows, err := tx.Query(TASK_SELECT_ENQUABLE_STATEMENT, cmd.Limit)
func (w *PostgresStoreWorker) readEnqueueableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) {
rows, err := tx.Query(TASK_SELECT_ENQUEUEABLE_STATEMENT, cmd.Limit)
if err != nil {
return nil, store.StoreErr(err)
}
Expand Down Expand Up @@ -1577,7 +1577,7 @@ func (w *PostgresStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnque

return &t_aio.Result{
Kind: t_aio.ReadEnqueueableTasks,
ReadEnquableTasks: &t_aio.QueryTasksResult{
ReadEnqueueableTasks: &t_aio.QueryTasksResult{
RowsReturned: rowsReturned,
Records: records,
},
Expand Down
10 changes: 5 additions & 5 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ const (
ORDER BY root_promise_id, id
LIMIT ?`

TASK_SELECT_ENQUABLE_STATEMENT = `
TASK_SELECT_ENQUEUEABLE_STATEMENT = `
SELECT
id,
process_id,
Expand Down Expand Up @@ -719,7 +719,7 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*t_aio.Tr
results[i][j], err = w.readTasks(tx, command.ReadTasks)
case t_aio.ReadEnqueueableTasks:
util.Assert(command.ReadEnquableTasks != nil, "command must not be nil")
results[i][j], err = w.readEnquableTasks(tx, command.ReadEnquableTasks)
results[i][j], err = w.readEnqueueableTasks(tx, command.ReadEnquableTasks)
case t_aio.CreateTask:
if taskInsertStmt == nil {
taskInsertStmt, err = tx.Prepare(TASK_INSERT_STATEMENT)
Expand Down Expand Up @@ -1532,8 +1532,8 @@ func (w *SqliteStoreWorker) readTasks(tx *sql.Tx, cmd *t_aio.ReadTasksCommand) (
}, nil
}

func (w *SqliteStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) {
rows, err := tx.Query(TASK_SELECT_ENQUABLE_STATEMENT, cmd.Limit)
func (w *SqliteStoreWorker) readEnqueueableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueueableTasksCommand) (*t_aio.Result, error) {
rows, err := tx.Query(TASK_SELECT_ENQUEUEABLE_STATEMENT, cmd.Limit)
if err != nil {
return nil, store.StoreErr(err)
}
Expand Down Expand Up @@ -1568,7 +1568,7 @@ func (w *SqliteStoreWorker) readEnquableTasks(tx *sql.Tx, cmd *t_aio.ReadEnqueue

return &t_aio.Result{
Kind: t_aio.ReadEnqueueableTasks,
ReadEnquableTasks: &t_aio.QueryTasksResult{
ReadEnqueueableTasks: &t_aio.QueryTasksResult{
RowsReturned: rowsReturned,
Records: records,
},
Expand Down
6 changes: 3 additions & 3 deletions internal/app/subsystems/aio/store/test/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -3241,7 +3241,7 @@ var TestCases = []*testCase{
},
{
Kind: t_aio.ReadEnqueueableTasks,
ReadEnquableTasks: &t_aio.QueryTasksResult{
ReadEnqueueableTasks: &t_aio.QueryTasksResult{
RowsReturned: 2,
Records: []*task.TaskRecord{
{
Expand Down Expand Up @@ -3273,7 +3273,7 @@ var TestCases = []*testCase{
},
{
Kind: t_aio.ReadEnqueueableTasks,
ReadEnquableTasks: &t_aio.QueryTasksResult{
ReadEnqueueableTasks: &t_aio.QueryTasksResult{
RowsReturned: 1,
Records: []*task.TaskRecord{
{
Expand All @@ -3296,7 +3296,7 @@ var TestCases = []*testCase{
},
{
Kind: t_aio.ReadEnqueueableTasks,
ReadEnquableTasks: &t_aio.QueryTasksResult{
ReadEnqueueableTasks: &t_aio.QueryTasksResult{
RowsReturned: 0,
},
},
Expand Down
16 changes: 8 additions & 8 deletions internal/kernel/t_aio/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ type Result struct {
DeleteSchedule *AlterSchedulesResult

// TASKS
ReadTask *QueryTasksResult
ReadTasks *QueryTasksResult
ReadEnquableTasks *QueryTasksResult
CreateTask *AlterTasksResult
CreateTasks *AlterTasksResult
CompleteTasks *AlterTasksResult
UpdateTask *AlterTasksResult
HeartbeatTasks *AlterTasksResult
ReadTask *QueryTasksResult
ReadTasks *QueryTasksResult
ReadEnqueueableTasks *QueryTasksResult
CreateTask *AlterTasksResult
CreateTasks *AlterTasksResult
CompleteTasks *AlterTasksResult
UpdateTask *AlterTasksResult
HeartbeatTasks *AlterTasksResult

// LOCKS
ReadLock *QueryLocksResult
Expand Down
2 changes: 1 addition & 1 deletion test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System)

system.Tick(time)

// now read from the callback channel
// now read from the backchannel
for len(d.config.Backchannel) > 0 {
var bc *Backchannel
obj := <-d.config.Backchannel
Expand Down
6 changes: 5 additions & 1 deletion test/dst/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ type RequestGenerator func(*rand.Rand, int64) *t_api.Request

func NewGenerator(r *rand.Rand, config *Config) *Generator {
idSet := make([]string, config.Ids)
width := len(strconv.Itoa(config.Ids))

for i := 0; i < config.Ids; i++ {
idSet[i] = strconv.Itoa(i)
// pad ids with leading zeros to ensure ids are the same length
// this helps with lexigraphical sorting across different databases
idSet[i] = fmt.Sprintf(fmt.Sprintf("%%0%dd", width), i)
}

idempotencyKeySet := []*idempotency.Key{}
Expand Down

0 comments on commit eba2e6a

Please sign in to comment.