Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.dev
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
LOG_PRETTY=true
HTTP_SHUTDOWN_DELAY=0
2 changes: 2 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
LOG_LEVEL=mute
LOG_PRETTY=false
HTTP_SHUTDOWN_DELAY=0

STATS_UPDATER_ENABLED=false
521 changes: 375 additions & 146 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"license": "ISC",
"devDependencies": {
"@nodescript/cli": "^1.9.0",
"@nodescript/eslint-config": "^2.0.1",
"@nodescript/eslint-config": "^2.1.0",
"@types/mocha": "^8.2.3",
"@types/node": "^18.19.23",
"dotenv": "^16.4.5",
Expand Down
15 changes: 15 additions & 0 deletions src/main/app.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { AuxHttpServer, BaseApp, JwtService } from '@nodescript/microframework';
import { dep, Mesh } from 'mesh-ioc';

import { CacheStatsStorage } from './global/CacheStatsStorage.js';
import { MongoCacheStatsStorage } from './global/CacheStatsStorage.mongo.js';
import { CacheStorage } from './global/CacheStorage.js';
import { MongoCacheStorage } from './global/CacheStorage.mongo.js';
import { MainHttpServer } from './global/MainHttpServer.js';
import { Metrics } from './global/Metrics.js';
import { MongoDb } from './global/MongoDb.js';
import { RedisManager } from './global/RedisManager.js';
import { StatsUpdater } from './global/StatsUpdater.js';

export class App extends BaseApp {

Expand All @@ -15,31 +18,43 @@ export class App extends BaseApp {
@dep() private mainHttpServer!: MainHttpServer;
@dep() private auxHttpServer!: AuxHttpServer;
@dep() private cacheStorage!: CacheStorage;
@dep() private cacheStatsStorage!: CacheStatsStorage;
@dep() private statsUpdater!: StatsUpdater;

constructor() {
super(new Mesh('App'));
this.mesh.service(AuxHttpServer);
this.mesh.service(CacheStorage, MongoCacheStorage);
this.mesh.service(CacheStatsStorage, MongoCacheStatsStorage);
this.mesh.service(JwtService);
this.mesh.service(MainHttpServer);
this.mesh.service(Metrics);
this.mesh.service(MongoDb);
this.mesh.service(RedisManager);
this.mesh.service(StatsUpdater);
}

override async start() {
await super.start();

await this.mongodb.start();
await this.redis.start();

await this.cacheStorage.setup();
await this.cacheStatsStorage.setup();

await this.mainHttpServer.start();
await this.auxHttpServer.start();
await this.statsUpdater.start();
}

override async stop() {
await super.stop();

await this.statsUpdater.stop();
await this.mainHttpServer.stop();
await this.auxHttpServer.stop();

await this.mongodb.stop();
await this.redis.stop();
}
Expand Down
84 changes: 84 additions & 0 deletions src/main/global/CacheStatsStorage.mongo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { Logger } from '@nodescript/logger';
import { dep } from 'mesh-ioc';

import { CacheStats, CacheStatsStorage } from './CacheStatsStorage.js';
import { MongoDb } from './MongoDb.js';

interface MongoCacheStats {
workspaceId: string;
count: number;
size: number;
lastUpdatedAt?: Date;
}

export class MongoCacheStatsStorage extends CacheStatsStorage {

@dep() private logger!: Logger;
@dep() private mongodb!: MongoDb;

private get collection() {
return this.mongodb.db.collection<MongoCacheStats>('cachestats');
}

async setup(): Promise<void> {
await this.collection.createIndex({
workspaceId: 1,
}, { unique: true });
this.logger.info('Created indexes on cachestats');
}

async *getAllStats(): AsyncIterable<CacheStats> {
const cursor = this.collection.find();
for await (const doc of cursor) {
yield this.deserialize(doc);
}
}

async getStats(workspaceId: string): Promise<CacheStats> {
const doc = await this.collection.findOne({ workspaceId });
if (!doc) {
return {
workspaceId,
count: 0,
size: 0,
lastUpdatedAt: 0,
};
}
return this.deserialize(doc);
}

async incrUsage(workspaceId: string, count: number, size: number): Promise<void> {
await this.collection.updateOne({
workspaceId
}, {
$inc: { count, size },
$set: { lastUpdatedAt: new Date() },
}, {
upsert: true,
});
}

async updateUsage(workspaceId: string, count: number, size: number): Promise<void> {
await this.collection.updateOne({
workspaceId,
}, {
$set: {
count,
size,
lastUpdatedAt: new Date(),
},
}, {
upsert: true,
});
}

private deserialize(doc: MongoCacheStats): CacheStats {
return {
workspaceId: doc.workspaceId,
count: doc.count,
size: doc.size,
lastUpdatedAt: doc.lastUpdatedAt?.valueOf(),
};
}

}
30 changes: 30 additions & 0 deletions src/main/global/CacheStatsStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
export interface CacheStats {
workspaceId: string;
count: number;
size: number;
lastUpdatedAt?: number;
}

export abstract class CacheStatsStorage {

abstract setup(): Promise<void>;

abstract getAllStats(): AsyncIterable<CacheStats>;

abstract getStats(
workspaceId: string,
): Promise<CacheStats>;

abstract incrUsage(
workspaceId: string,
count: number,
size: number,
): Promise<void>;

abstract updateUsage(
workspaceId: string,
count: number,
size: number,
): Promise<void>;

}
3 changes: 1 addition & 2 deletions src/main/global/CacheStorage.mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ export class MongoCacheStorage extends CacheStorage {
return expired ? null : this.deserialize(doc);
}

async checkCacheUsage(workspaceId: string, key: string): Promise<CacheUsageStats> {
async calcUsage(workspaceId: string): Promise<CacheUsageStats> {
const res = await this.collection.aggregate([
{
$match: {
workspaceId,
key: { $ne: key },
},
},
{
Expand Down
3 changes: 1 addition & 2 deletions src/main/global/CacheStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ export abstract class CacheStorage {
key: string,
): Promise<CacheData | null>;

abstract checkCacheUsage(
abstract calcUsage(
workspaceId: string,
key: string,
): Promise<CacheUsageStats>;

abstract upsertData(
Expand Down
59 changes: 59 additions & 0 deletions src/main/global/StatsUpdater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { Logger } from '@nodescript/logger';
import { config } from 'mesh-config';
import { dep } from 'mesh-ioc';

import { CacheStatsStorage } from './CacheStatsStorage.js';
import { CacheStorage } from './CacheStorage.js';

export class StatsUpdater {

@config({ default: 60_000 })
private STATS_UPDATER_INTERVAL_MS!: number;

@config({ default: true })
private STATS_UPDATER_ENABLED!: boolean;

@dep() private logger!: Logger;
@dep() private cacheStatsStorage!: CacheStatsStorage;
@dep() private cacheStorage!: CacheStorage;

private running = false;
private runPromise: Promise<void> | null = null;

async start() {
if (!this.STATS_UPDATER_ENABLED) {
return;
}
if (this.running) {
return;
}
this.running = true;
this.runPromise = this.run();
}

async stop() {
this.running = false;
await this.runPromise;
}

private async run() {
while (this.running) {
await this.updateStats();
await new Promise(resolve => setTimeout(resolve, this.STATS_UPDATER_INTERVAL_MS));
}
}

private async updateStats() {
try {
this.logger.info('Updating stats');
for await (const stat of this.cacheStatsStorage.getAllStats()) {
const actualUsage = await this.cacheStorage.calcUsage(stat.workspaceId);
await this.cacheStatsStorage.updateUsage(stat.workspaceId, actualUsage.count, actualUsage.size);
}
this.logger.info('Stats updated');
} catch (error) {
this.logger.error('Error updating stats', { error });
}
}

}
36 changes: 28 additions & 8 deletions src/main/scoped/CacheDomainImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AccessDeniedError } from '@nodescript/errors';
import { config } from 'mesh-config';
import { dep } from 'mesh-ioc';

import { CacheStatsStorage } from '../global/CacheStatsStorage.js';
import { CacheStorage } from '../global/CacheStorage.js';
import { RedisManager } from '../global/RedisManager.js';
import { AuthContext } from './AuthContext.js';
Expand Down Expand Up @@ -34,6 +35,7 @@ export class CacheDomainImpl implements CacheDomain {

@dep() private authContext!: AuthContext;
@dep() private cacheStorage!: CacheStorage;
@dep() private cacheStatsStorage!: CacheStatsStorage;
@dep() private nsApi!: NodeScriptApi;
@dep() private redis!: RedisManager;

Expand All @@ -59,18 +61,32 @@ export class CacheDomainImpl implements CacheDomain {
const maxKeys = Number(workspace.metadata.cacheMaxKeys) || this.CACHE_MAX_KEYS;
const maxSize = Number(workspace.metadata.cacheMaxSize) || this.CACHE_MAX_SIZE;
const maxEntrySize = Number(workspace.metadata.cacheMaxEntrySize) || this.CACHE_MAX_ENTRY_SIZE;
const usage = await this.cacheStorage.checkCacheUsage(token.workspaceId, req.key);
const expiresAt = this.evalExpirationTime(req.expiresAt);
if ((usage.count + 1) > maxKeys) {
// Note: since we store aggregated stats, we need to also fetch data to understand how to update the stats
const data = await this.cacheStorage.getData(token.workspaceId, req.key);
const isNew = data == null;
const stats = await this.cacheStatsStorage.getStats(token.workspaceId);
// Max keys check
if (isNew && (stats.count + 1) > maxKeys) {
throw new AccessDeniedError('Maximum number of keys in cache reached');
}
// Max size check
const buffer = Buffer.from(JSON.stringify(req.data), 'utf-8');
if ((usage.size + buffer.byteLength) > maxSize) {
throw new AccessDeniedError('Maximum size of data in cache reached');
}
if (buffer.byteLength > maxEntrySize) {
const oldSize = isNew ? 0 : data.size;
const newSize = buffer.byteLength;
if (newSize > maxEntrySize) {
throw new AccessDeniedError(`Entry cannot exceed ${maxEntrySize} bytes`);
}
const sizeDelta = newSize - oldSize;
if ((stats.size + sizeDelta) > maxSize) {
throw new AccessDeniedError('Maximum size of data in cache reached');
}
// Update stats
// Note: we're not super accurate here (esp. around race conditions),
// we just need to have some rough limits to prevent abuse.
// The exact stats are re-calculated periodically to remove accumulated inaccuracies.
await this.cacheStatsStorage.incrUsage(token.workspaceId, isNew ? 1 : 0, sizeDelta);
// Write data
const expiresAt = this.evalExpirationTime(req.expiresAt);
await this.cacheStorage.upsertData(token.workspaceId, req.key, buffer, expiresAt);
return {};
}
Expand All @@ -82,7 +98,11 @@ export class CacheDomainImpl implements CacheDomain {
}
await this.checkRateLimit(token.workspaceId);
this.authContext.requirePermissions([Permission.WORKSPACE_CACHE_WRITE]);
this.cacheStorage.deleteData(token.workspaceId, req.key);
const data = await this.cacheStorage.getData(token.workspaceId, req.key);
await this.cacheStorage.deleteData(token.workspaceId, req.key);
if (data) {
await this.cacheStatsStorage.incrUsage(token.workspaceId, -1, -data.size);
}
return {};
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scoped/NodeScriptApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class NodeScriptApi {
@config({ default: 100_000 })
WORKSPACE_CACHE_MAX_COUNT!: number;

@config({ default: 5_000 })
@config({ default: 60_000 })
WORKSPACE_CACHE_TTL!: number;

@dep() private ctx!: HttpContext;
Expand Down
Loading