Skip to content

Commit

Permalink
feat(locks): improve usability by accepting delta (#216)
Browse files Browse the repository at this point in the history
* feat(locks): improve usability by accepting delta

* feat(locks): addressed feedback from David

* fix(locks): no underscores in openapi

* fix(locks): add missing field

---------

Co-authored-by: Gabriel Guerra <[email protected]>
  • Loading branch information
guergabo and Gabriel Guerra authored Jan 24, 2024
1 parent 8ae847f commit 0773fab
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 279 deletions.
38 changes: 19 additions & 19 deletions api/locks-openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ paths:
/locks/heartbeat:
post:
summary: heartbeat
description: update heartbeat for all locks that match the process_id
description: update heartbeat for all locks that match the processId
operationId: heartbeat
requestBody:
content:
Expand Down Expand Up @@ -62,32 +62,32 @@ components:
Lock:
type: object
properties:
resource_id:
resourceId:
type: string
process_id:
processId:
type: string
execution_id:
executionId:
type: string
timeout:
expiryInSeconds:
type: integer
format: int64
expiresAt:
type: integer
format: int64
readOnly: true
required:
- resource_id
- process_id
- execution_id
- timeout
- resourceId
- processId
- executionId
- expiryInSeconds

HeartbeatRequest:
type: object
properties:
process_id:
processId:
type: string
timeout:
type: integer
format: int64
required:
- process_id
- timeout
- processId

HeartbeatResponse:
type: object
Expand All @@ -99,10 +99,10 @@ components:
ReleaseLockRequest:
type: object
properties:
resource_id:
resourceId:
type: string
execution_id:
executionId:
type: string
required:
- resource_id
- execution_id
- resourceId
- executionId
10 changes: 5 additions & 5 deletions internal/app/coroutines/acquireLock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
Timeout: req.AcquireLock.Timeout,
Timeout: c.Time() + (req.AcquireLock.ExpiryInSeconds * 1000), // from s to ms
},
},
},
Expand Down Expand Up @@ -62,10 +62,10 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn
AcquireLock: &t_api.AcquireLockResponse{
Status: t_api.StatusCreated,
Lock: &lock.Lock{
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
Timeout: req.AcquireLock.Timeout,
ResourceId: req.AcquireLock.ResourceId,
ProcessId: req.AcquireLock.ProcessId,
ExecutionId: req.AcquireLock.ExecutionId,
ExpiryInSeconds: req.AcquireLock.ExpiryInSeconds,
},
},
}, nil)
Expand Down
1 change: 0 additions & 1 deletion internal/app/coroutines/heartbeatLocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func HeartbeatLocks(metadata *metadata.Metadata, req *t_api.Request, res CallBac
Kind: t_aio.HeartbeatLocks,
HeartbeatLocks: &t_aio.HeartbeatLocksCommand{
ProcessId: req.HeartbeatLocks.ProcessId,
Timeout: req.HeartbeatLocks.Timeout,
},
},
},
Expand Down
25 changes: 14 additions & 11 deletions internal/app/subsystems/aio/store/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
const (
CREATE_TABLE_STATEMENT = `
CREATE TABLE IF NOT EXISTS locks (
resource_id TEXT,
process_id TEXT,
execution_id TEXT,
timeout BIGINT,
resource_id TEXT,
process_id TEXT,
execution_id TEXT,
expiry_in_seconds BIGINT,
timeout BIGINT,
PRIMARY KEY(resource_id)
);
Expand Down Expand Up @@ -119,30 +120,31 @@ const (

LOCK_READ_STATEMENT = `
SELECT
resource_id, process_id, execution_id, timeout
resource_id, process_id, execution_id, expiry_in_seconds, timeout
FROM
locks
WHERE
resource_id = $1`

LOCK_ACQUIRE_STATEMENT = `
INSERT INTO locks
(resource_id, process_id, execution_id, timeout)
(resource_id, process_id, execution_id, expiry_in_seconds, timeout)
VALUES
($1, $2, $3, $4)
($1, $2, $3, $4, $5)
ON CONFLICT(resource_id)
DO UPDATE SET
process_id = EXCLUDED.process_id,
expiry_in_seconds = EXCLUDED.expiry_in_seconds,
timeout = EXCLUDED.timeout
WHERE locks.execution_id = EXCLUDED.execution_id`

LOCK_HEARTBEAT_STATEMENT = `
UPDATE
locks
SET
timeout = $1
timeout = timeout + (expiry_in_seconds * 1000)
WHERE
process_id = $2`
process_id = $1`

LOCK_RELEASE_STATEMENT = `
DELETE FROM locks WHERE resource_id = $1 AND execution_id = $2`
Expand Down Expand Up @@ -730,6 +732,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (
&record.ResourceId,
&record.ProcessId,
&record.ExecutionId,
&record.ExpiryInSeconds,
&record.Timeout,
); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -755,7 +758,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (

func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) {
// insert
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout)
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -775,7 +778,7 @@ func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio

func (w *PostgresStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) {
// update
res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId)
res, err := stmt.Exec(cmd.ProcessId)
if err != nil {
return nil, err
}
Expand Down
23 changes: 13 additions & 10 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
const (
CREATE_TABLE_STATEMENT = `
CREATE TABLE IF NOT EXISTS locks (
resource_id TEXT UNIQUE,
process_id TEXT,
execution_id TEXT,
timeout INTEGER
resource_id TEXT UNIQUE,
process_id TEXT,
execution_id TEXT,
expiry_in_seconds INTEGER,
timeout INTEGER
);
CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id);
Expand Down Expand Up @@ -108,20 +109,21 @@ const (

LOCK_READ_STATEMENT = `
SELECT
resource_id, process_id, execution_id, timeout
resource_id, process_id, execution_id, expiry_in_seconds, timeout
FROM
locks
WHERE
resource_id = ?`

LOCK_ACQUIRE_STATEMENT = `
INSERT INTO locks
(resource_id, process_id, execution_id, timeout)
(resource_id, process_id, execution_id, expiry_in_seconds, timeout)
VALUES
(?, ?, ?, ?)
(?, ?, ?, ?, ?)
ON CONFLICT(resource_id)
DO UPDATE SET
process_id = excluded.process_id,
expiry_in_seconds = excluded.expiry_in_seconds,
timeout = excluded.timeout
WHERE
execution_id = excluded.execution_id`
Expand All @@ -130,7 +132,7 @@ const (
UPDATE
locks
SET
timeout = ?
timeout = timeout + (expiry_in_seconds * 1000)
WHERE
process_id = ?`

Expand Down Expand Up @@ -692,6 +694,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t
&record.ResourceId,
&record.ProcessId,
&record.ExecutionId,
&record.ExpiryInSeconds,
&record.Timeout,
); err != nil {
if err == sql.ErrNoRows {
Expand All @@ -717,7 +720,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t

func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) {
// insert
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout)
res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout)
if err != nil {
return nil, err
}
Expand All @@ -737,7 +740,7 @@ func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.A

func (w *SqliteStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) {
// update
res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId)
res, err := stmt.Exec(cmd.ProcessId)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0773fab

Please sign in to comment.