Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ export class TypeormAssetProvider extends TypeormProvider implements AssetProvid
#paginationPageSizeLimit: number;

constructor({ paginationPageSizeLimit }: TypeormAssetProviderProps, dependencies: TypeormAssetProviderDependencies) {
const { connectionConfig$, entities, logger } = dependencies;
super('TypeormAssetProvider', { connectionConfig$, entities, logger });
const { connectionConfig$, entities, logger, healthCheckCache } = dependencies;
super('TypeormAssetProvider', { connectionConfig$, entities, healthCheckCache, logger });

this.#dependencies = dependencies;
this.#paginationPageSizeLimit = paginationPageSizeLimit;
this.#nftMetadataService = new TypeOrmNftMetadataService({ connectionConfig$, entities, logger });
this.#nftMetadataService = new TypeOrmNftMetadataService({ connectionConfig$, entities, healthCheckCache, logger });
}

async getAsset({ assetId, extraData }: GetAssetArgs): Promise<Asset.AssetInfo> {
Expand Down
28 changes: 14 additions & 14 deletions packages/cardano-services/src/InMemoryCache/InMemoryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ export type Key = string | number;
export type AsyncAction<T> = () => Promise<T>;

export class InMemoryCache {
#cache: NodeCache;
#ttlDefault: number;
protected cache: NodeCache;
protected ttlDefault: number;

/**
*
* @param ttl The default time to live in seconds
* @param cache The cache engine. It must extend NodeCache
*/
constructor(ttl: Seconds | 0, cache: NodeCache = new NodeCache()) {
this.#ttlDefault = ttl;
this.#cache = cache;
this.ttlDefault = ttl;
this.cache = cache;
}

/**
Expand All @@ -26,16 +26,16 @@ export class InMemoryCache {
* @param ttl The time to live in seconds
* @returns The value stored with the key
*/
public async get<T>(key: Key, asyncAction: AsyncAction<T>, ttl = this.#ttlDefault): Promise<T> {
const cachedValue: T | undefined = this.#cache.get(key);
public async get<T>(key: Key, asyncAction: AsyncAction<T>, ttl = this.ttlDefault): Promise<T> {
const cachedValue: T | undefined = this.cache.get(key);
if (cachedValue) {
return cachedValue;
}

const resultPromise = asyncAction();
this.set(
key,
resultPromise.catch(() => this.#cache.del(key)),
resultPromise.catch(() => this.cache.del(key)),
ttl
);
return resultPromise;
Expand All @@ -48,7 +48,7 @@ export class InMemoryCache {
* @returns The value stored in the key
*/
public getVal<T>(key: Key) {
return this.#cache.get<T>(key);
return this.cache.get<T>(key);
}

/**
Expand All @@ -59,8 +59,8 @@ export class InMemoryCache {
* @param ttl The time to live in seconds
* @returns The success state of the operation
*/
public set<T>(key: Key, value: T, ttl = this.#ttlDefault) {
return this.#cache.set<T>(key, value, ttl);
public set<T>(key: Key, value: T, ttl = this.ttlDefault) {
return this.cache.set<T>(key, value, ttl);
}

/**
Expand All @@ -69,7 +69,7 @@ export class InMemoryCache {
* @param keys cache key to delete or a array of cache keys
*/
public invalidate(keys: Key | Key[]) {
this.#cache.del(keys);
this.cache.del(keys);
}

/**
Expand All @@ -78,16 +78,16 @@ export class InMemoryCache {
* @returns An array of all keys
*/
public keys() {
return this.#cache.keys();
return this.cache.keys();
}

/** Clear the interval timeout which is set on check period option. Default: 600 */
public shutdown() {
this.#cache.close();
this.cache.close();
}

/** Clear the whole data and reset the stats */
public clear() {
this.#cache.flushAll();
this.cache.flushAll();
}
}
98 changes: 98 additions & 0 deletions packages/cardano-services/src/InMemoryCache/WarmCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { AsyncAction, InMemoryCache, Key } from './InMemoryCache';
import { Seconds } from '@cardano-sdk/core';
import NodeCache from 'node-cache';

interface WarmCacheItem<T> {
asyncAction: AsyncAction<T>;
value: T;
ttl: number;
updateTime: number;
}

export class WarmCache extends InMemoryCache {
constructor(ttl: Seconds, expireCheckPeriod: Seconds) {
const cache = new NodeCache({
checkperiod: expireCheckPeriod,
deleteOnExpire: true,
stdTTL: ttl,
useClones: false
});
super(ttl, cache);

this.cache.on('expired', (key, value) => {
this.#warm(key, value, this.cache);
});
}

public mockCache(cache: NodeCache) {
this.cache = cache;
}
public async get<T>(key: Key, asyncAction: AsyncAction<T>, ttl = this.ttlDefault): Promise<T> {
const cachedValue: WarmCacheItem<T> | undefined = this.cache.get(key);

if (cachedValue && cachedValue.value) {
return cachedValue.value;
}

const updateTime = Date.now();
const promise = this.#setWarmCacheItem<T>(key, asyncAction, ttl, this.cache, updateTime);

this.cache.set(
key,
{
asyncAction,
ttl,
updateTime,
value: promise
// value: _resolved ? Promise.resolve(value) : Promise.reject(value)
} as WarmCacheItem<T>,
ttl
);

return promise;
}

#warm<T>(key: string, item: WarmCacheItem<T> | undefined, cacheNode: NodeCache) {
if (item && item.asyncAction) {
this.#setWarmCacheItem(key, item.asyncAction, item.ttl, cacheNode, Date.now()).catch(
() => 'rejected in the background'
);
}
}

async #setWarmCacheItem<T>(
key: Key,
asyncAction: AsyncAction<T>,
ttl: number,
cacheNode: NodeCache,
updateTime: number
) {
const handleValue = (value: T, _resolved = true) => {
const item = this.cache.get(key) as WarmCacheItem<T>;
if (item && item.updateTime > updateTime) {
return item.value;
}
const promise = _resolved ? Promise.resolve(value) : Promise.reject(value);

cacheNode.set(
key,
{
asyncAction,
ttl,
updateTime,
value: promise
// value: _resolved ? Promise.resolve(value) : Promise.reject(value)
} as WarmCacheItem<T>,
ttl
);
return promise;
};

try {
const value = await asyncAction();
return handleValue(value, true);
} catch (error) {
return handleValue(error as T, false);
}
}
}
22 changes: 19 additions & 3 deletions packages/cardano-services/src/Program/programs/providerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { ProviderServerArgs, ProviderServerOptionDescriptions, ServiceNames } fr
import { SrvRecord } from 'dns';
import { TypeormAssetProvider } from '../../Asset/TypeormAssetProvider';
import { TypeormStakePoolProvider } from '../../StakePool/TypeormStakePoolProvider/TypeormStakePoolProvider';
import { WarmCache } from '../../InMemoryCache/WarmCache';
import { createDbSyncMetadataService } from '../../Metadata';
import { createLogger } from 'bunyan';
import { getConnectionConfig, getOgmiosObservableCardanoNode } from '../services';
Expand Down Expand Up @@ -99,10 +100,12 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
};

const getCache = (ttl: Seconds | 0) => (args.disableDbCache ? new NoCache() : new InMemoryCache(ttl));
const getWarmCache = (ttl: Seconds) => (args.disableDbCache ? new NoCache() : new WarmCache(ttl, Seconds(ttl / 10)));

const getDbCache = () => getCache(args.dbCacheTtl);

// Shared cache across all providers
const healthCheckCache = getCache(args.healthCheckCacheTtl);
const healthCheckCache = getWarmCache(args.healthCheckCacheTtl);

const getEpochMonitor = memoize((dbPool: Pool) => new DbSyncEpochPollService(dbPool, args.epochPollInterval!));

Expand Down Expand Up @@ -142,7 +145,13 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
const getTypeormStakePoolProvider = withTypeOrmProvider('StakePool', (connectionConfig$) => {
const entities = getEntities(['currentPoolMetrics', 'poolDelisted', 'poolMetadata', 'poolRewards']);

return new TypeormStakePoolProvider(args, { cache: getDbCache(), connectionConfig$, entities, logger });
return new TypeormStakePoolProvider(args, {
cache: getDbCache(),
connectionConfig$,
entities,
healthCheckCache,
logger
});
});

const getNetworkInfoProvider = (cardanoNode: CardanoNode, dbPools: DbPools) => {
Expand Down Expand Up @@ -173,7 +182,12 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
sharedHandleProvider = await withTypeOrmProvider(
'Handle',
async (connectionConfig$) =>
new TypeOrmHandleProvider({ connectionConfig$, entities: getEntities(['handle', 'handleMetadata']), logger })
new TypeOrmHandleProvider({
connectionConfig$,
entities: getEntities(['handle', 'handleMetadata']),
healthCheckCache,
logger
})
)();

return sharedHandleProvider;
Expand All @@ -198,6 +212,7 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
{
connectionConfig$,
entities: getEntities(['asset']),
healthCheckCache,
logger,
tokenMetadataService
}
Expand Down Expand Up @@ -304,6 +319,7 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
: new NodeTxSubmitProvider({
cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args),
handleProvider: args.submitValidateHandles ? await getHandleProvider() : undefined,
healthCheckCache,
logger
});
return new TxSubmitHttpService({ logger, txSubmitProvider });
Expand Down
12 changes: 10 additions & 2 deletions packages/cardano-services/src/TxSubmit/NodeTxSubmitProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
TxSubmissionError,
TxSubmitProvider
} from '@cardano-sdk/core';
import { InMemoryCache } from '../InMemoryCache';
import { Logger } from 'ts-log';
import { WithLogger } from '@cardano-sdk/util';

type ObservableTxSubmitter = Pick<ObservableCardanoNode, 'healthCheck$' | 'submitTx'>;
export type NodeTxSubmitProviderProps = WithLogger & {
handleProvider?: HandleProvider;
cardanoNode: ObservableTxSubmitter;
healthCheckCache: InMemoryCache;
};

const emptyMessage = 'ObservableCardanoNode observable completed without emitting';
Expand Down Expand Up @@ -49,19 +51,21 @@ export class NodeTxSubmitProvider implements TxSubmitProvider {
#logger: Logger;
#cardanoNode: ObservableTxSubmitter;
#handleProvider?: HandleProvider;
#healthCheckCache: InMemoryCache;

constructor({ handleProvider, cardanoNode, logger }: NodeTxSubmitProviderProps) {
constructor({ handleProvider, cardanoNode, logger, healthCheckCache }: NodeTxSubmitProviderProps) {
this.#handleProvider = handleProvider;
this.#cardanoNode = cardanoNode;
this.#logger = logger;
this.#healthCheckCache = healthCheckCache;
}

async submitTx({ signedTransaction, context }: SubmitTxArgs): Promise<void> {
await this.#throwIfHandleResolutionConflict(context);
await firstValueFrom(this.#cardanoNode.submitTx(signedTransaction)).catch(toProviderError);
}

async healthCheck(): Promise<HealthCheckResponse> {
async #checkHealth(): Promise<HealthCheckResponse> {
const [cardanoNodeHealth, handleProviderHealth] = await Promise.all([
firstValueFrom(this.#cardanoNode.healthCheck$).catch((error): HealthCheckResponse => {
if (error instanceof EmptyError) {
Expand All @@ -79,6 +83,10 @@ export class NodeTxSubmitProvider implements TxSubmitProvider {
};
}

async healthCheck(): Promise<HealthCheckResponse> {
return this.#healthCheckCache.get('ogmios_cardano_node', () => this.#checkHealth());
}

async #throwIfHandleResolutionConflict(context: SubmitTxArgs['context']): Promise<void> {
if (context?.handleResolutions && context.handleResolutions.length > 0) {
if (!this.#handleProvider) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
import { HealthCheckResponse, Milliseconds, Provider } from '@cardano-sdk/core';
import { InMemoryCache } from '../../InMemoryCache';
import { TypeormService, TypeormServiceDependencies } from '../TypeormService';
import { skip } from 'rxjs';

export type TypeormProviderDependencies = Omit<TypeormServiceDependencies, 'connectionTimeout'>;
export type TypeormProviderDependencies = Omit<TypeormServiceDependencies, 'connectionTimeout'> & {
healthCheckCache: InMemoryCache;
};

const unhealthy = { ok: false, reason: 'Provider error' };

export abstract class TypeormProvider extends TypeormService implements Provider {
health: HealthCheckResponse = { ok: false, reason: 'not started' };
healthCheckCache: InMemoryCache;

constructor(name: string, dependencies: TypeormProviderDependencies) {
super(name, { ...dependencies, connectionTimeout: Milliseconds(1000) });
// We skip 1 to omit the initial null value of the subject
this.dataSource$.pipe(skip(1)).subscribe((dataSource) => {
this.health = dataSource ? { ok: true } : unhealthy;
});

this.healthCheckCache = dependencies.healthCheckCache;
}

async healthCheck(): Promise<HealthCheckResponse> {
if (this.state === 'running')
try {
await this.withDataSource((dataSource) => dataSource.query('SELECT 1'));
await this.healthCheckCache.get(`typeorm_db_${this.name}`, () =>
this.withDataSource((dataSource) => dataSource.query('SELECT 1'))
);
} catch {
return unhealthy;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Cardano, util } from '@cardano-sdk/core';
import { TypeOrmNftMetadataService, createDnsResolver, getConnectionConfig, getEntities } from '../../src';
import { NoCache, TypeOrmNftMetadataService, createDnsResolver, getConnectionConfig, getEntities } from '../../src';
import { logger, mockProviders } from '@cardano-sdk/util-dev';

describe('TypeOrmNftMetadataService', () => {
Expand All @@ -11,7 +11,7 @@ describe('TypeOrmNftMetadataService', () => {
const connectionConfig$ = getConnectionConfig(dnsResolver, 'test', 'Asset', {
postgresConnectionStringAsset: process.env.POSTGRES_CONNECTION_STRING_ASSET!
});
service = new TypeOrmNftMetadataService({ connectionConfig$, entities, logger });
service = new TypeOrmNftMetadataService({ connectionConfig$, entities, healthCheckCache: new NoCache(), logger });
await service.initialize();
await service.start();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Asset, Cardano } from '@cardano-sdk/core';
import { AssetEntity, NftMetadataEntity } from '@cardano-sdk/projection-typeorm';
import { IsNull, Not } from 'typeorm';
import { Logger } from 'ts-log';
import { NoCache } from '../../../src';
import { TypeormProvider, TypeormProviderDependencies } from '../../../src/util';

export enum TypeormAssetWith {
Expand All @@ -12,7 +13,7 @@ export class TypeormAssetFixtureBuilder extends TypeormProvider {
#logger: Logger;

constructor({ connectionConfig$, entities, logger }: TypeormProviderDependencies) {
super('TypeormAssetFixtureBuilder', { connectionConfig$, entities, logger });
super('TypeormAssetFixtureBuilder', { connectionConfig$, entities, healthCheckCache: new NoCache(), logger });
this.#logger = logger;
}

Expand Down
Loading