Skip to content

Commit 7a7a817

Browse files
authored
Merge pull request #55 from Mitch5000/Distributed-lock-TTL-expires-during-long-Stellar-operations--FIXED
Distributed lock TTL expires during long Stellar operations FIXED
2 parents f3630ef + 7e93b50 commit 7a7a817

2 files changed

Lines changed: 150 additions & 1 deletion

File tree

src/utils/lock.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { redis } from "../config/redis.js";
22
import { ConflictError } from "./errors.js";
33
import crypto from "node:crypto";
4+
import { logger } from "./logger.js";
45

56
export async function withLock<T>(
67
key: string,
78
fn: () => Promise<T>,
8-
ttlMs: number = 10_000
9+
ttlMs: number = 30_000
910
): Promise<T> {
1011
const lockKey = `lock:${key}`;
1112
const lockValue = crypto.randomUUID();
@@ -21,9 +22,34 @@ export async function withLock<T>(
2122
throw new ConflictError("Operation in progress, please retry");
2223
}
2324

25+
let heartbeat: NodeJS.Timeout | undefined;
26+
2427
try {
28+
heartbeat = setInterval(async () => {
29+
try {
30+
const script = `
31+
if redis.call("get", KEYS[1]) == ARGV[1] then
32+
return redis.call("pexpire", KEYS[1], ARGV[2])
33+
else
34+
return 0
35+
end
36+
`;
37+
const result = await redis.eval(script, 1, lockKey, lockValue, ttlMs);
38+
if (result !== 1) {
39+
logger.warn({ lockKey }, "Lock renewal failed: lock lost or changed");
40+
if (heartbeat) {
41+
clearInterval(heartbeat);
42+
heartbeat = undefined;
43+
}
44+
}
45+
} catch (err) {
46+
logger.error({ err, lockKey }, "Error during lock renewal heartbeat");
47+
}
48+
}, ttlMs / 2);
49+
2550
return await fn();
2651
} finally {
52+
if (heartbeat) clearInterval(heartbeat);
2753
const script = `
2854
if redis.call("get", KEYS[1]) == ARGV[1] then
2955
return redis.call("del", KEYS[1])

tests/unit/utils/lock.test.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
2+
import { withLock } from "../../../src/utils/lock.js";
3+
import { redis } from "../../../src/config/redis.js";
4+
import { ConflictError } from "../../../src/utils/errors.js";
5+
6+
vi.mock("../../../src/config/redis.js", () => ({
7+
redis: {
8+
set: vi.fn(),
9+
eval: vi.fn(),
10+
},
11+
}));
12+
13+
vi.mock("../../../src/utils/logger.js", () => ({
14+
logger: {
15+
warn: vi.fn(),
16+
error: vi.fn(),
17+
info: vi.fn(),
18+
},
19+
}));
20+
21+
describe("withLock", () => {
22+
beforeEach(() => {
23+
vi.clearAllMocks();
24+
vi.useFakeTimers();
25+
});
26+
27+
afterEach(() => {
28+
vi.useRealTimers();
29+
});
30+
31+
it("should acquire lock and release it after execution", async () => {
32+
(redis.set as any).mockResolvedValue("OK");
33+
(redis.eval as any).mockResolvedValue(1);
34+
35+
const fn = vi.fn().mockResolvedValue("result");
36+
const result = await withLock("test", fn);
37+
38+
expect(result).toBe("result");
39+
expect(redis.set).toHaveBeenCalledWith(
40+
"lock:test",
41+
expect.any(String),
42+
"PX",
43+
30000,
44+
"NX"
45+
);
46+
// The release eval call should have happened
47+
expect(redis.eval).toHaveBeenCalled();
48+
});
49+
50+
it("should throw ConflictError if lock cannot be acquired", async () => {
51+
(redis.set as any).mockResolvedValue(null);
52+
53+
const fn = vi.fn();
54+
await expect(withLock("test", fn)).rejects.toThrow("Operation in progress, please retry");
55+
expect(fn).not.toHaveBeenCalled();
56+
});
57+
58+
it("should renew lock via heartbeat", async () => {
59+
(redis.set as any).mockResolvedValue("OK");
60+
(redis.eval as any).mockResolvedValue(1);
61+
62+
let resolveFn: any;
63+
const promise = new Promise((resolve) => {
64+
resolveFn = resolve;
65+
});
66+
67+
const fn = vi.fn().mockReturnValue(promise);
68+
69+
const lockPromise = withLock("test", fn, 10000);
70+
71+
// Advance time to trigger heartbeat (ttlMs / 2 = 5000)
72+
await vi.advanceTimersByTimeAsync(5001);
73+
74+
// Check if eval was called for renewal
75+
expect(redis.eval).toHaveBeenCalledWith(
76+
expect.stringContaining("pexpire"),
77+
1,
78+
"lock:test",
79+
expect.any(String),
80+
10000
81+
);
82+
83+
// Trigger another heartbeat
84+
await vi.advanceTimersByTimeAsync(5001);
85+
// Now it should have been called twice for renewal
86+
expect(redis.eval).toHaveBeenCalledTimes(2);
87+
88+
resolveFn("done");
89+
await lockPromise;
90+
91+
// After completion, it should have called eval one more time for release
92+
// But since release is also an eval call, total should be 3
93+
expect(redis.eval).toHaveBeenCalledTimes(3);
94+
});
95+
96+
it("should stop heartbeat if renewal fails", async () => {
97+
(redis.set as any).mockResolvedValue("OK");
98+
// First renewal returns 0 (lock lost)
99+
(redis.eval as any).mockResolvedValue(0);
100+
101+
let resolveFn: any;
102+
const promise = new Promise((resolve) => {
103+
resolveFn = resolve;
104+
});
105+
106+
const fn = vi.fn().mockReturnValue(promise);
107+
108+
const lockPromise = withLock("test", fn, 10000);
109+
110+
await vi.advanceTimersByTimeAsync(5001);
111+
expect(redis.eval).toHaveBeenCalledTimes(1);
112+
113+
// Advance more time, should NOT call eval again because heartbeat should be cleared
114+
await vi.advanceTimersByTimeAsync(5001);
115+
expect(redis.eval).toHaveBeenCalledTimes(1);
116+
117+
resolveFn("done");
118+
await lockPromise;
119+
120+
// Release call happens at the end
121+
expect(redis.eval).toHaveBeenCalledTimes(2);
122+
});
123+
});

0 commit comments

Comments
 (0)