Skip to content

Commit

Permalink
Create concurrency-limit.test.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
nickscamara committed Jan 30, 2025
1 parent b44358a commit 1893608
Showing 1 changed file with 239 additions and 0 deletions.
239 changes: 239 additions & 0 deletions apps/api/src/__tests__/concurrency-limit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import { redisConnection } from "../services/queue-service";
import {
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
pushConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
pushConcurrencyLimitedJob,
getConcurrencyQueueJobsCount,
ConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { getConcurrencyLimitMax } from "../services/rate-limiter";
import { PlanType } from "../types";

// Mock Redis client
jest.mock("../services/queue-service", () => ({
redisConnection: {
zremrangebyscore: jest.fn(),
zrangebyscore: jest.fn(),
zadd: jest.fn(),
zrem: jest.fn(),
zmpop: jest.fn(),
zcard: jest.fn(),
},
}));

describe("Concurrency Limit", () => {
const mockTeamId = "test-team-id";
const mockJobId = "test-job-id";
const mockNow = 1000000;

beforeEach(() => {
jest.clearAllMocks();
});

describe("cleanOldConcurrencyLimitEntries", () => {
it("should remove entries older than current timestamp", async () => {
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);

expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
-Infinity,
mockNow
);
});
});

describe("getConcurrencyLimitActiveJobs", () => {
it("should return active jobs after given timestamp", async () => {
const mockActiveJobs = ["job1", "job2"];
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(mockActiveJobs);

const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);

expect(result).toEqual(mockActiveJobs);
expect(redisConnection.zrangebyscore).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockNow,
Infinity
);
});

it("should return empty array when no active jobs", async () => {
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);

const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);

expect(result).toEqual([]);
});
});

describe("pushConcurrencyLimitActiveJob", () => {
it("should add job with expiration timestamp", async () => {
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, mockNow);

expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockNow + 2 * 60 * 1000, // stalledJobTimeoutMs
mockJobId
);
});
});

describe("removeConcurrencyLimitActiveJob", () => {
it("should remove job from active jobs", async () => {
await removeConcurrencyLimitActiveJob(mockTeamId, mockJobId);

expect(redisConnection.zrem).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockJobId
);
});
});

describe("Queue Operations", () => {
const mockJob: ConcurrencyLimitedJob = {
id: mockJobId,
data: { test: "data" },
opts: {},
priority: 1,
};

describe("takeConcurrencyLimitedJob", () => {
it("should return null when queue is empty", async () => {
(redisConnection.zmpop as jest.Mock).mockResolvedValue(null);

const result = await takeConcurrencyLimitedJob(mockTeamId);

expect(result).toBeNull();
});

it("should return and remove the highest priority job", async () => {
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
"key",
[[JSON.stringify(mockJob)]],
]);

const result = await takeConcurrencyLimitedJob(mockTeamId);

expect(result).toEqual(mockJob);
expect(redisConnection.zmpop).toHaveBeenCalledWith(
1,
"concurrency-limit-queue:test-team-id",
"MIN"
);
});
});

describe("pushConcurrencyLimitedJob", () => {
it("should add job to queue with priority", async () => {
await pushConcurrencyLimitedJob(mockTeamId, mockJob);

expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id",
mockJob.priority,
JSON.stringify(mockJob)
);
});

it("should use default priority 1 when not specified", async () => {
const jobWithoutPriority = { ...mockJob };
delete jobWithoutPriority.priority;

await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority);

expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id",
1,
JSON.stringify(jobWithoutPriority)
);
});
});

describe("getConcurrencyQueueJobsCount", () => {
it("should return the number of jobs in queue", async () => {
const mockCount = 5;
(redisConnection.zcard as jest.Mock).mockResolvedValue(mockCount);

const result = await getConcurrencyQueueJobsCount(mockTeamId);

expect(result).toBe(mockCount);
expect(redisConnection.zcard).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id"
);
});

it("should return 0 for empty queue", async () => {
(redisConnection.zcard as jest.Mock).mockResolvedValue(0);

const result = await getConcurrencyQueueJobsCount(mockTeamId);

expect(result).toBe(0);
});
});
});

describe("getConcurrencyLimitMax", () => {
it("should return correct limit for free plan", () => {
const result = getConcurrencyLimitMax("free");
expect(result).toBe(2);
});

it("should return correct limit for standard plan", () => {
const result = getConcurrencyLimitMax("standard");
expect(result).toBe(10);
});

it("should return correct limit for scale plan", () => {
const result = getConcurrencyLimitMax("scale");
expect(result).toBe(100);
});

it("should return default limit for unknown plan", () => {
const result = getConcurrencyLimitMax("unknown" as PlanType);
expect(result).toBe(10);
});

it("should handle special team IDs", () => {
process.env.DEV_B_TEAM_ID = "dev-b-team";
const result = getConcurrencyLimitMax("free", "dev-b-team");
expect(result).toBe(120);
});
});

describe("Integration Scenarios", () => {
it("should handle complete job lifecycle", async () => {
const mockJob: ConcurrencyLimitedJob = {
id: "lifecycle-test",
data: { test: "lifecycle" },
opts: {},
};

// Push job to queue
await pushConcurrencyLimitedJob(mockTeamId, mockJob);
expect(redisConnection.zadd).toHaveBeenCalled();

// Take job from queue
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
"key",
[[JSON.stringify(mockJob)]],
]);
const takenJob = await takeConcurrencyLimitedJob(mockTeamId);
expect(takenJob).toEqual(mockJob);

// Add to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, mockNow);
expect(redisConnection.zadd).toHaveBeenCalled();

// Verify active jobs
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([mockJob.id]);
const activeJobs = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
expect(activeJobs).toContain(mockJob.id);

// Remove from active jobs
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
expect(redisConnection.zrem).toHaveBeenCalled();
});
});
});

0 comments on commit 1893608

Please sign in to comment.