From 0773faba49a50a2ce85d1e8b0029909f8783f364 Mon Sep 17 00:00:00 2001 From: guergabo <65991626+guergabo@users.noreply.github.com> Date: Tue, 23 Jan 2024 20:50:04 -0500 Subject: [PATCH] feat(locks): improve usability by accepting delta (#216) * feat(locks): improve usability by accepting delta * feat(locks): addressed feedback from David * fix(locks): no underscores in openapi * fix(locks): add missing field --------- Co-authored-by: Gabriel Guerra --- api/locks-openapi.yml | 38 ++--- internal/app/coroutines/acquireLock.go | 10 +- internal/app/coroutines/heartbeatLocks.go | 1 - .../subsystems/aio/store/postgres/postgres.go | 25 ++-- .../app/subsystems/aio/store/sqlite/sqlite.go | 23 +-- .../app/subsystems/aio/store/test/cases.go | 100 +++++++------ .../app/subsystems/api/grpc/api/lock.pb.go | 132 +++++++++--------- .../app/subsystems/api/grpc/api/lock.proto | 6 +- internal/app/subsystems/api/grpc/grpc_test.go | 69 ++++----- internal/app/subsystems/api/grpc/lock.go | 22 ++- internal/app/subsystems/api/http/http_test.go | 26 ++-- internal/app/subsystems/api/service/lock.go | 12 +- .../app/subsystems/api/service/request.go | 9 +- internal/kernel/t_aio/store.go | 10 +- internal/kernel/t_api/request.go | 16 +-- pkg/lock/lock.go | 14 +- pkg/lock/record.go | 18 +-- test/dst/generator.go | 12 +- test/dst/model.go | 9 +- 19 files changed, 273 insertions(+), 279 deletions(-) diff --git a/api/locks-openapi.yml b/api/locks-openapi.yml index 65332d17..604c7463 100644 --- a/api/locks-openapi.yml +++ b/api/locks-openapi.yml @@ -26,7 +26,7 @@ paths: /locks/heartbeat: post: summary: heartbeat - description: update heartbeat for all locks that match the process_id + description: update heartbeat for all locks that match the processId operationId: heartbeat requestBody: content: @@ -62,32 +62,32 @@ components: Lock: type: object properties: - resource_id: + resourceId: type: string - process_id: + processId: type: string - execution_id: + executionId: type: string - timeout: + expiryInSeconds: type: integer format: int64 + expiresAt: + type: integer + format: int64 + readOnly: true required: - - resource_id - - process_id - - execution_id - - timeout + - resourceId + - processId + - executionId + - expiryInSeconds HeartbeatRequest: type: object properties: - process_id: + processId: type: string - timeout: - type: integer - format: int64 required: - - process_id - - timeout + - processId HeartbeatResponse: type: object @@ -99,10 +99,10 @@ components: ReleaseLockRequest: type: object properties: - resource_id: + resourceId: type: string - execution_id: + executionId: type: string required: - - resource_id - - execution_id + - resourceId + - executionId diff --git a/internal/app/coroutines/acquireLock.go b/internal/app/coroutines/acquireLock.go index 6f09dd8c..49e28bfb 100644 --- a/internal/app/coroutines/acquireLock.go +++ b/internal/app/coroutines/acquireLock.go @@ -27,7 +27,7 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn ResourceId: req.AcquireLock.ResourceId, ProcessId: req.AcquireLock.ProcessId, ExecutionId: req.AcquireLock.ExecutionId, - Timeout: req.AcquireLock.Timeout, + Timeout: c.Time() + (req.AcquireLock.ExpiryInSeconds * 1000), // from s to ms }, }, }, @@ -62,10 +62,10 @@ func AcquireLock(metadata *metadata.Metadata, req *t_api.Request, res CallBackFn AcquireLock: &t_api.AcquireLockResponse{ Status: t_api.StatusCreated, Lock: &lock.Lock{ - ResourceId: req.AcquireLock.ResourceId, - ProcessId: req.AcquireLock.ProcessId, - ExecutionId: req.AcquireLock.ExecutionId, - Timeout: req.AcquireLock.Timeout, + ResourceId: req.AcquireLock.ResourceId, + ProcessId: req.AcquireLock.ProcessId, + ExecutionId: req.AcquireLock.ExecutionId, + ExpiryInSeconds: req.AcquireLock.ExpiryInSeconds, }, }, }, nil) diff --git a/internal/app/coroutines/heartbeatLocks.go b/internal/app/coroutines/heartbeatLocks.go index 8392492a..9771bd15 100644 --- a/internal/app/coroutines/heartbeatLocks.go +++ b/internal/app/coroutines/heartbeatLocks.go @@ -23,7 +23,6 @@ func HeartbeatLocks(metadata *metadata.Metadata, req *t_api.Request, res CallBac Kind: t_aio.HeartbeatLocks, HeartbeatLocks: &t_aio.HeartbeatLocksCommand{ ProcessId: req.HeartbeatLocks.ProcessId, - Timeout: req.HeartbeatLocks.Timeout, }, }, }, diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index ee698ea8..f7f8a899 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -28,10 +28,11 @@ import ( const ( CREATE_TABLE_STATEMENT = ` CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT, - process_id TEXT, - execution_id TEXT, - timeout BIGINT, + resource_id TEXT, + process_id TEXT, + execution_id TEXT, + expiry_in_seconds BIGINT, + timeout BIGINT, PRIMARY KEY(resource_id) ); @@ -119,7 +120,7 @@ const ( LOCK_READ_STATEMENT = ` SELECT - resource_id, process_id, execution_id, timeout + resource_id, process_id, execution_id, expiry_in_seconds, timeout FROM locks WHERE @@ -127,12 +128,13 @@ const ( LOCK_ACQUIRE_STATEMENT = ` INSERT INTO locks - (resource_id, process_id, execution_id, timeout) + (resource_id, process_id, execution_id, expiry_in_seconds, timeout) VALUES - ($1, $2, $3, $4) + ($1, $2, $3, $4, $5) ON CONFLICT(resource_id) DO UPDATE SET process_id = EXCLUDED.process_id, + expiry_in_seconds = EXCLUDED.expiry_in_seconds, timeout = EXCLUDED.timeout WHERE locks.execution_id = EXCLUDED.execution_id` @@ -140,9 +142,9 @@ const ( UPDATE locks SET - timeout = $1 + timeout = timeout + (expiry_in_seconds * 1000) WHERE - process_id = $2` + process_id = $1` LOCK_RELEASE_STATEMENT = ` DELETE FROM locks WHERE resource_id = $1 AND execution_id = $2` @@ -730,6 +732,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) ( &record.ResourceId, &record.ProcessId, &record.ExecutionId, + &record.ExpiryInSeconds, &record.Timeout, ); err != nil { if err == sql.ErrNoRows { @@ -755,7 +758,7 @@ func (w *PostgresStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) ( func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) { // insert - res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout) + res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout) if err != nil { return nil, err } @@ -775,7 +778,7 @@ func (w *PostgresStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio func (w *PostgresStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) { // update - res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId) + res, err := stmt.Exec(cmd.ProcessId) if err != nil { return nil, err } diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 384b516c..5d9f5b47 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -28,10 +28,11 @@ import ( const ( CREATE_TABLE_STATEMENT = ` CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT UNIQUE, - process_id TEXT, - execution_id TEXT, - timeout INTEGER + resource_id TEXT UNIQUE, + process_id TEXT, + execution_id TEXT, + expiry_in_seconds INTEGER, + timeout INTEGER ); CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); @@ -108,7 +109,7 @@ const ( LOCK_READ_STATEMENT = ` SELECT - resource_id, process_id, execution_id, timeout + resource_id, process_id, execution_id, expiry_in_seconds, timeout FROM locks WHERE @@ -116,12 +117,13 @@ const ( LOCK_ACQUIRE_STATEMENT = ` INSERT INTO locks - (resource_id, process_id, execution_id, timeout) + (resource_id, process_id, execution_id, expiry_in_seconds, timeout) VALUES - (?, ?, ?, ?) + (?, ?, ?, ?, ?) ON CONFLICT(resource_id) DO UPDATE SET process_id = excluded.process_id, + expiry_in_seconds = excluded.expiry_in_seconds, timeout = excluded.timeout WHERE execution_id = excluded.execution_id` @@ -130,7 +132,7 @@ const ( UPDATE locks SET - timeout = ? + timeout = timeout + (expiry_in_seconds * 1000) WHERE process_id = ?` @@ -692,6 +694,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t &record.ResourceId, &record.ProcessId, &record.ExecutionId, + &record.ExpiryInSeconds, &record.Timeout, ); err != nil { if err == sql.ErrNoRows { @@ -717,7 +720,7 @@ func (w *SqliteStoreWorker) readLock(tx *sql.Tx, cmd *t_aio.ReadLockCommand) (*t func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.AcquireLockCommand) (*t_aio.Result, error) { // insert - res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.Timeout) + res, err := stmt.Exec(cmd.ResourceId, cmd.ProcessId, cmd.ExecutionId, cmd.ExpiryInSeconds, cmd.Timeout) if err != nil { return nil, err } @@ -737,7 +740,7 @@ func (w *SqliteStoreWorker) acquireLock(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.A func (w *SqliteStoreWorker) hearbeatLocks(tx *sql.Tx, stmt *sql.Stmt, cmd *t_aio.HeartbeatLocksCommand) (*t_aio.Result, error) { // update - res, err := stmt.Exec(cmd.Timeout, cmd.ProcessId) + res, err := stmt.Exec(cmd.ProcessId) if err != nil { return nil, err } diff --git a/internal/app/subsystems/aio/store/test/cases.go b/internal/app/subsystems/aio/store/test/cases.go index 498fc774..7f24ffb7 100644 --- a/internal/app/subsystems/aio/store/test/cases.go +++ b/internal/app/subsystems/aio/store/test/cases.go @@ -142,19 +142,21 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "barUpdated", - ExecutionId: "baz", - Timeout: 1736571700000, + ResourceId: "foo", + ProcessId: "barUpdated", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571700000, }, }, }, @@ -179,19 +181,21 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz1", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz1", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz2", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz2", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, }, @@ -216,35 +220,37 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo-1", - ProcessId: "a", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo-1", + ProcessId: "a", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo-2", - ProcessId: "a", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo-2", + ProcessId: "a", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo-3", - ProcessId: "b", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo-3", + ProcessId: "b", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { Kind: t_aio.HeartbeatLocks, HeartbeatLocks: &t_aio.HeartbeatLocksCommand{ ProcessId: "a", - Timeout: 1736572500000, }, }, }, @@ -281,10 +287,11 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { @@ -313,10 +320,11 @@ var TestCases = []*testCase{ ReadLock: &t_aio.QueryLocksResult{ RowsReturned: 1, Records: []*lock.LockRecord{{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }}, }, }, @@ -334,10 +342,11 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { @@ -369,10 +378,11 @@ var TestCases = []*testCase{ { Kind: t_aio.AcquireLock, AcquireLock: &t_aio.AcquireLockCommand{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, + Timeout: 1736571600000, }, }, { diff --git a/internal/app/subsystems/api/grpc/api/lock.pb.go b/internal/app/subsystems/api/grpc/api/lock.pb.go index 3d0c2c1e..bd84df66 100644 --- a/internal/app/subsystems/api/grpc/api/lock.pb.go +++ b/internal/app/subsystems/api/grpc/api/lock.pb.go @@ -25,10 +25,11 @@ type Lock struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ResourceId string `protobuf:"bytes,1,opt,name=resource_id,json=resourceId,proto3" json:"resource_id,omitempty"` - ProcessId string `protobuf:"bytes,2,opt,name=process_id,json=processId,proto3" json:"process_id,omitempty"` - ExecutionId string `protobuf:"bytes,3,opt,name=execution_id,json=executionId,proto3" json:"execution_id,omitempty"` - Timeout int64 `protobuf:"varint,4,opt,name=timeout,proto3" json:"timeout,omitempty"` + ResourceId string `protobuf:"bytes,1,opt,name=resource_id,json=resourceId,proto3" json:"resource_id,omitempty"` + ProcessId string `protobuf:"bytes,2,opt,name=process_id,json=processId,proto3" json:"process_id,omitempty"` + ExecutionId string `protobuf:"bytes,3,opt,name=execution_id,json=executionId,proto3" json:"execution_id,omitempty"` + ExpiryInSeconds int64 `protobuf:"varint,4,opt,name=expiry_in_seconds,json=expiryInSeconds,proto3" json:"expiry_in_seconds,omitempty"` + ExpiresAt int64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` } func (x *Lock) Reset() { @@ -84,9 +85,16 @@ func (x *Lock) GetExecutionId() string { return "" } -func (x *Lock) GetTimeout() int64 { +func (x *Lock) GetExpiryInSeconds() int64 { + if x != nil { + return x.ExpiryInSeconds + } + return 0 +} + +func (x *Lock) GetExpiresAt() int64 { if x != nil { - return x.Timeout + return x.ExpiresAt } return 0 } @@ -199,8 +207,7 @@ type HeartbeatLocksRequest struct { unknownFields protoimpl.UnknownFields ProcessId string `protobuf:"bytes,1,opt,name=process_id,json=processId,proto3" json:"process_id,omitempty"` - Timeout int64 `protobuf:"varint,2,opt,name=timeout,proto3" json:"timeout,omitempty"` - RequestId string `protobuf:"bytes,3,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + RequestId string `protobuf:"bytes,2,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` } func (x *HeartbeatLocksRequest) Reset() { @@ -242,13 +249,6 @@ func (x *HeartbeatLocksRequest) GetProcessId() string { return "" } -func (x *HeartbeatLocksRequest) GetTimeout() int64 { - if x != nil { - return x.Timeout - } - return 0 -} - func (x *HeartbeatLocksRequest) GetRequestId() string { if x != nil { return x.RequestId @@ -410,63 +410,65 @@ var file_internal_app_subsystems_api_grpc_api_lock_proto_rawDesc = []byte{ 0x0a, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x83, 0x01, 0x0a, 0x04, 0x4c, 0x6f, 0x63, 0x6b, + 0x6f, 0x12, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0xb4, 0x01, 0x0a, 0x04, 0x4c, 0x6f, 0x63, 0x6b, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, - 0x6e, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x53, 0x0a, - 0x12, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0a, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x04, 0x6c, - 0x6f, 0x63, 0x6b, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x49, 0x64, 0x22, 0x35, 0x0a, 0x13, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, - 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x04, 0x6c, 0x6f, 0x63, - 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4c, - 0x6f, 0x63, 0x6b, 0x52, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x6f, 0x0a, 0x15, 0x48, 0x65, 0x61, - 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x49, - 0x64, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, + 0x6e, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x5f, 0x69, 0x6e, + 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, + 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x49, 0x6e, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, + 0x1d, 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x09, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x41, 0x74, 0x22, 0x53, + 0x0a, 0x12, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x04, + 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x49, 0x64, 0x22, 0x35, 0x0a, 0x13, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, + 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x04, 0x6c, 0x6f, + 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, + 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x04, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x55, 0x0a, 0x15, 0x48, 0x65, + 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x22, 0x3e, 0x0a, 0x16, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, + 0x63, 0x6b, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x6c, + 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x0d, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, + 0x64, 0x22, 0x77, 0x0a, 0x12, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, + 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x3e, 0x0a, 0x16, 0x48, 0x65, - 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x41, 0x66, 0x66, - 0x65, 0x63, 0x74, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x6c, 0x6f, 0x63, - 0x6b, 0x73, 0x41, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x22, 0x77, 0x0a, 0x12, 0x52, 0x65, - 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, - 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, - 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, - 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x49, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xe2, 0x01, 0x0a, 0x05, 0x4c, - 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x44, 0x0a, 0x0b, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, - 0x6f, 0x63, 0x6b, 0x12, 0x18, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, - 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, - 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x0e, 0x48, 0x65, - 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x1b, 0x2e, 0x6c, - 0x6f, 0x63, 0x6b, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, - 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, - 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x0b, 0x52, 0x65, 0x6c, - 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x12, 0x18, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, - 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, - 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, - 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x68, 0x71, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, - 0x65, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, - 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, - 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x65, + 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x32, 0xe2, 0x01, 0x0a, 0x05, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x44, 0x0a, 0x0b, 0x41, + 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x12, 0x18, 0x2e, 0x6c, 0x6f, 0x63, + 0x6b, 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x41, 0x63, 0x71, 0x75, + 0x69, 0x72, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x4d, 0x0a, 0x0e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, + 0x63, 0x6b, 0x73, 0x12, 0x1b, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, + 0x62, 0x65, 0x61, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1c, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, + 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x44, 0x0a, 0x0b, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x12, + 0x18, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, + 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6c, 0x6f, 0x63, 0x6b, + 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x68, 0x71, 0x2f, + 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/app/subsystems/api/grpc/api/lock.proto b/internal/app/subsystems/api/grpc/api/lock.proto index 66650460..6c582289 100644 --- a/internal/app/subsystems/api/grpc/api/lock.proto +++ b/internal/app/subsystems/api/grpc/api/lock.proto @@ -14,7 +14,8 @@ message Lock { string resource_id = 1; string process_id = 2; string execution_id = 3; - int64 timeout = 4; + int64 expiry_in_seconds = 4; + int64 expires_at = 5; } message AcquireLockRequest { @@ -28,8 +29,7 @@ message AcquireLockResponse { message HeartbeatLocksRequest { string process_id = 1; - int64 timeout = 2; - string request_id = 3; + string request_id = 2; } message HeartbeatLocksResponse { diff --git a/internal/app/subsystems/api/grpc/grpc_test.go b/internal/app/subsystems/api/grpc/grpc_test.go index 23c8a7a2..ed428fa7 100644 --- a/internal/app/subsystems/api/grpc/grpc_test.go +++ b/internal/app/subsystems/api/grpc/grpc_test.go @@ -1249,19 +1249,19 @@ func TestAcquireLock(t *testing.T) { name: "AcquireLock", grpcReq: &grpcApi.AcquireLockRequest{ Lock: &grpcApi.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 1, }, }, req: &t_api.Request{ Kind: t_api.AcquireLock, AcquireLock: &t_api.AcquireLockRequest{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 1, }, }, res: &t_api.Response{ @@ -1269,10 +1269,10 @@ func TestAcquireLock(t *testing.T) { AcquireLock: &t_api.AcquireLockResponse{ Status: t_api.StatusCreated, Lock: &lock.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 1, }, }, }, @@ -1282,10 +1282,10 @@ func TestAcquireLock(t *testing.T) { name: "AcquireLock missing resource id", grpcReq: &grpcApi.AcquireLockRequest{ Lock: &grpcApi.Lock{ - ResourceId: "", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1, + ResourceId: "", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 1, }, }, req: nil, @@ -1296,10 +1296,10 @@ func TestAcquireLock(t *testing.T) { name: "AcquireLock missing process id", grpcReq: &grpcApi.AcquireLockRequest{ Lock: &grpcApi.Lock{ - ResourceId: "foo", - ProcessId: "", - ExecutionId: "baz", - Timeout: 1, + ResourceId: "foo", + ProcessId: "", + ExecutionId: "baz", + ExpiryInSeconds: 1, }, }, req: nil, @@ -1310,10 +1310,10 @@ func TestAcquireLock(t *testing.T) { name: "AcquireLock missing execution id", grpcReq: &grpcApi.AcquireLockRequest{ Lock: &grpcApi.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "", - Timeout: 1, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "", + ExpiryInSeconds: 1, }, }, req: nil, @@ -1324,10 +1324,10 @@ func TestAcquireLock(t *testing.T) { name: "AcquireLock missing timeout", grpcReq: &grpcApi.AcquireLockRequest{ Lock: &grpcApi.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 0, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 0, }, }, req: nil, @@ -1385,13 +1385,11 @@ func TestHeartbeatLocks(t *testing.T) { name: "HeartbeatLocks", grpcReq: &grpcApi.HeartbeatLocksRequest{ ProcessId: "foo", - Timeout: 1, }, req: &t_api.Request{ Kind: t_api.HeartbeatLocks, HeartbeatLocks: &t_api.HeartbeatLocksRequest{ ProcessId: "foo", - Timeout: 1, }, }, res: &t_api.Response{ @@ -1406,17 +1404,6 @@ func TestHeartbeatLocks(t *testing.T) { name: "HeartbeatLocks missing process id", grpcReq: &grpcApi.HeartbeatLocksRequest{ ProcessId: "", - Timeout: 1, - }, - req: nil, - res: nil, - code: codes.InvalidArgument, - }, - { - name: "HeartbeatLocks missing timeout", - grpcReq: &grpcApi.HeartbeatLocksRequest{ - ProcessId: "foo", - Timeout: 0, }, req: nil, res: nil, diff --git a/internal/app/subsystems/api/grpc/lock.go b/internal/app/subsystems/api/grpc/lock.go index a566a647..442ddd37 100644 --- a/internal/app/subsystems/api/grpc/lock.go +++ b/internal/app/subsystems/api/grpc/lock.go @@ -30,15 +30,15 @@ func (s *server) AcquireLock(ctx context.Context, req *grpcApi.AcquireLockReques if req.Lock.ExecutionId == "" { return nil, grpcStatus.Error(codes.InvalidArgument, "lock.execution_id must be provided") } - if req.Lock.Timeout == 0 { + if req.Lock.ExpiryInSeconds == 0 { return nil, grpcStatus.Error(codes.InvalidArgument, "lock.timeout must be provided") } body := &service.AcquireLockBody{ - ResourceId: req.Lock.ResourceId, - ProcessId: req.Lock.ProcessId, - ExecutionId: req.Lock.ExecutionId, - Timeout: req.Lock.Timeout, + ResourceId: req.Lock.ResourceId, + ProcessId: req.Lock.ProcessId, + ExecutionId: req.Lock.ExecutionId, + ExpiryInSeconds: req.Lock.ExpiryInSeconds, } resp, err := s.service.AcquireLock(header, body) @@ -61,13 +61,9 @@ func (s *server) HeartbeatLocks(ctx context.Context, req *grpcApi.HeartbeatLocks if req.ProcessId == "" { return nil, grpcStatus.Error(codes.InvalidArgument, "process_id must be provided") } - if req.Timeout == 0 { - return nil, grpcStatus.Error(codes.InvalidArgument, "timeout must be provided") - } body := &service.HeartbeatBody{ ProcessId: req.ProcessId, - Timeout: req.Timeout, } res, err := s.service.Heartbeat(header, body) @@ -115,9 +111,9 @@ func protoLock(lock *lock.Lock) *grpcApi.Lock { } return &grpcApi.Lock{ - ResourceId: lock.ResourceId, - ProcessId: lock.ProcessId, - ExecutionId: lock.ExecutionId, - Timeout: lock.Timeout, + ResourceId: lock.ResourceId, + ProcessId: lock.ProcessId, + ExecutionId: lock.ExecutionId, + ExpiryInSeconds: lock.ExpiryInSeconds, } } diff --git a/internal/app/subsystems/api/http/http_test.go b/internal/app/subsystems/api/http/http_test.go index 197dcd45..904b6dd4 100644 --- a/internal/app/subsystems/api/http/http_test.go +++ b/internal/app/subsystems/api/http/http_test.go @@ -769,15 +769,15 @@ func TestHttpServer(t *testing.T) { "resourceId": "foo", "processId": "bar", "executionId": "baz", - "timeout": 1736571600000 + "expiryInSeconds": 10 }`), req: &t_api.Request{ Kind: t_api.AcquireLock, AcquireLock: &t_api.AcquireLockRequest{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, }, }, res: &t_api.Response{ @@ -785,24 +785,24 @@ func TestHttpServer(t *testing.T) { AcquireLock: &t_api.AcquireLockResponse{ Status: t_api.StatusCreated, Lock: &lock.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - Timeout: 1736571600000, + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInSeconds: 10, }, }, }, status: 201, }, { - name: "AcquireLock missing resourceId", + name: "AcquireLock missing executionIdc", path: "locks/acquire", method: "POST", body: []byte(`{ "resourceId": "foo", "processId": "bar", "executionId": "", - "timeout": 1736571600000 + "expiryInSeconds": 10 }`), req: nil, res: nil, @@ -813,14 +813,12 @@ func TestHttpServer(t *testing.T) { path: "locks/heartbeat", method: "POST", body: []byte(`{ - "processId": "bar", - "timeout": 1736571600000 + "processId": "bar" }`), req: &t_api.Request{ Kind: t_api.HeartbeatLocks, HeartbeatLocks: &t_api.HeartbeatLocksRequest{ ProcessId: "bar", - Timeout: 1736571600000, }, }, res: &t_api.Response{ diff --git a/internal/app/subsystems/api/service/lock.go b/internal/app/subsystems/api/service/lock.go index 03b34fad..bb7356ae 100644 --- a/internal/app/subsystems/api/service/lock.go +++ b/internal/app/subsystems/api/service/lock.go @@ -15,13 +15,13 @@ func (s *Service) AcquireLock(header *Header, body *AcquireLockBody) (*t_api.Acq util.Assert(body.ResourceId != "", "lock.resource_id must be provided") util.Assert(body.ProcessId != "", "lock.process_id must be provided") util.Assert(body.ExecutionId != "", "lock.execution_id must be provided") - util.Assert(body.Timeout != 0, "lock.timeout must be provided") + util.Assert(body.ExpiryInSeconds != 0, "lock.timeout must be provided") acquireLock := &t_api.AcquireLockRequest{ - ResourceId: body.ResourceId, - ProcessId: body.ProcessId, - ExecutionId: body.ExecutionId, - Timeout: body.Timeout, + ResourceId: body.ResourceId, + ProcessId: body.ProcessId, + ExecutionId: body.ExecutionId, + ExpiryInSeconds: body.ExpiryInSeconds, } cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) @@ -55,11 +55,9 @@ func (s *Service) AcquireLock(header *Header, body *AcquireLockBody) (*t_api.Acq func (s *Service) Heartbeat(header *Header, body *HeartbeatBody) (*t_api.HeartbeatLocksResponse, error) { util.Assert(body.ProcessId != "", "process_id must be provided") - util.Assert(body.Timeout != 0, "timeout must be provided") HeartbeatLocks := &t_api.HeartbeatLocksRequest{ ProcessId: body.ProcessId, - Timeout: body.Timeout, } cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) diff --git a/internal/app/subsystems/api/service/request.go b/internal/app/subsystems/api/service/request.go index 182771ec..9174b9ef 100644 --- a/internal/app/subsystems/api/service/request.go +++ b/internal/app/subsystems/api/service/request.go @@ -64,15 +64,14 @@ type CreateScheduleBody struct { // LOCKS type AcquireLockBody struct { - ResourceId string `json:"resourceId" binding:"required"` - ProcessId string `json:"processId" binding:"required"` - ExecutionId string `json:"executionId" binding:"required"` - Timeout int64 `json:"timeout" binding:"required"` + ResourceId string `json:"resourceId" binding:"required"` + ProcessId string `json:"processId" binding:"required"` + ExecutionId string `json:"executionId" binding:"required"` + ExpiryInSeconds int64 `json:"expiryInSeconds" binding:"required"` } type HeartbeatBody struct { ProcessId string `json:"processId" binding:"required"` - Timeout int64 `json:"timeout" binding:"required"` } type ReleaseLockBody struct { diff --git a/internal/kernel/t_aio/store.go b/internal/kernel/t_aio/store.go index 4d4e55c0..87a73f0c 100644 --- a/internal/kernel/t_aio/store.go +++ b/internal/kernel/t_aio/store.go @@ -475,15 +475,15 @@ type ReadLockCommand struct { } type AcquireLockCommand struct { - ResourceId string - ProcessId string - ExecutionId string - Timeout int64 + ResourceId string + ProcessId string + ExecutionId string + ExpiryInSeconds int64 + Timeout int64 } type HeartbeatLocksCommand struct { ProcessId string - Timeout int64 } type ReleaseLockCommand struct { diff --git a/internal/kernel/t_api/request.go b/internal/kernel/t_api/request.go index 1d3e18e3..933e26a4 100644 --- a/internal/kernel/t_api/request.go +++ b/internal/kernel/t_api/request.go @@ -141,15 +141,14 @@ type EchoRequest struct { // Locks type AcquireLockRequest struct { - ResourceId string `json:"resourceId"` - ProcessId string `json:"processId"` - ExecutionId string `json:"executionId"` - Timeout int64 `json:"timeout"` + ResourceId string `json:"resourceId"` + ProcessId string `json:"processId"` + ExecutionId string `json:"executionId"` + ExpiryInSeconds int64 `json:"expiryInSeconds"` } type HeartbeatLocksRequest struct { ProcessId string `json:"processId"` - Timeout int64 `json:"timeout"` } type ReleaseLockRequest struct { @@ -258,17 +257,16 @@ func (r *Request) String() string { // LOCKS case AcquireLock: return fmt.Sprintf( - "AcquireLock(resourceId=%s, processId=%s, executionId=%s, timeout=%d)", + "AcquireLock(resourceId=%s, processId=%s, executionId=%s, expiryInSeconds=%d)", r.AcquireLock.ResourceId, r.AcquireLock.ProcessId, r.AcquireLock.ExecutionId, - r.AcquireLock.Timeout, + r.AcquireLock.ExpiryInSeconds, ) case HeartbeatLocks: return fmt.Sprintf( - "HeartbeatLocks(processId=%s, timeout=%d)", + "HeartbeatLocks(processId=%s)", r.HeartbeatLocks.ProcessId, - r.HeartbeatLocks.Timeout, ) case ReleaseLock: diff --git a/pkg/lock/lock.go b/pkg/lock/lock.go index 402a937c..acf37716 100644 --- a/pkg/lock/lock.go +++ b/pkg/lock/lock.go @@ -3,18 +3,20 @@ package lock import "fmt" type Lock struct { - ResourceId string `json:"resourceId"` - ProcessId string `json:"processId"` - ExecutionId string `json:"executionId"` - Timeout int64 `json:"timeout"` + ResourceId string `json:"resourceId"` + ProcessId string `json:"processId"` + ExecutionId string `json:"executionId"` + ExpiryInSeconds int64 `json:"expiryInSeconds"` + ExpiresAt int64 `json:"expiresAt"` } func (l *Lock) String() string { return fmt.Sprintf( - "Lock(resourceId=%s, processId=%s, executionId=%s, timeout=%d)", + "Lock(resourceId=%s, processId=%s, executionId=%s, expiryInSeconds=%d, expiresAt=%d)", l.ResourceId, l.ProcessId, l.ExecutionId, - l.Timeout, + l.ExpiryInSeconds, + l.ExpiresAt, ) } diff --git a/pkg/lock/record.go b/pkg/lock/record.go index c32e1184..485c0fa8 100644 --- a/pkg/lock/record.go +++ b/pkg/lock/record.go @@ -1,17 +1,19 @@ package lock type LockRecord struct { - ResourceId string - ProcessId string - ExecutionId string - Timeout int64 + ResourceId string + ProcessId string + ExecutionId string + ExpiryInSeconds int64 + Timeout int64 } func (r *LockRecord) Lock() (*Lock, error) { return &Lock{ - ResourceId: r.ResourceId, - ProcessId: r.ProcessId, - ExecutionId: r.ExecutionId, - Timeout: r.Timeout, + ResourceId: r.ResourceId, + ProcessId: r.ProcessId, + ExecutionId: r.ExecutionId, + ExpiryInSeconds: r.ExpiryInSeconds, + ExpiresAt: r.Timeout, }, nil } diff --git a/test/dst/generator.go b/test/dst/generator.go index a4b3f562..b3509cd2 100644 --- a/test/dst/generator.go +++ b/test/dst/generator.go @@ -383,28 +383,26 @@ func (g *Generator) GenerateAcquireLock(r *rand.Rand, t int64) *t_api.Request { resourceId := g.idSet[r.Intn(len(g.idSet))] processId := g.idSet[r.Intn(len(g.idSet))] executionId := g.idSet[r.Intn(len(g.idSet))] - timeout := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick) + expiryInSeconds := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick) return &t_api.Request{ Kind: t_api.AcquireLock, AcquireLock: &t_api.AcquireLockRequest{ - ResourceId: resourceId, - ProcessId: processId, - ExecutionId: executionId, - Timeout: timeout, + ResourceId: resourceId, + ProcessId: processId, + ExecutionId: executionId, + ExpiryInSeconds: expiryInSeconds, }, } } func (g *Generator) GenerateHeartbeatLocks(r *rand.Rand, t int64) *t_api.Request { processId := g.idSet[r.Intn(len(g.idSet))] - timeout := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick) return &t_api.Request{ Kind: t_api.HeartbeatLocks, HeartbeatLocks: &t_api.HeartbeatLocksRequest{ ProcessId: processId, - Timeout: timeout, }, } } diff --git a/test/dst/model.go b/test/dst/model.go index de537279..da03ac33 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -615,7 +615,7 @@ func (m *Model) ValidateHeartbeatLocks(t int64, req *t_api.Request, res *t_api.R if l.lock.ProcessId == req.HeartbeatLocks.ProcessId { // update local model for processId's locks owned := m.locks.Get(l.lock.ResourceId) - owned.lock.Timeout = req.HeartbeatLocks.Timeout + owned.lock.ExpiresAt = owned.lock.ExpiresAt + (owned.lock.ExpiryInSeconds * 1000) } } @@ -641,11 +641,10 @@ func (m *Model) ValidateReleaseLock(t int64, req *t_api.Request, res *t_api.Resp return nil } - // todo: come back to once expand dst knowledge. // if lock belongs to the same executionId it must have timedout. - // if lm.lock.Timeout > t { - // return fmt.Errorf("executionId %s still has the lock for resourceId %s", req.ReleaseLock.ExecutionId, req.ReleaseLock.ResourceId) - // } + if lm.lock.ExpiresAt > t { + return fmt.Errorf("executionId %s still has the lock for resourceId %s", req.ReleaseLock.ExecutionId, req.ReleaseLock.ResourceId) + } lm.lock = nil }