Skip to content

Commit dda9794

Browse files
feat!: coldObservableProvider now stops after maxRetries (defaults to 10) and logs errors
- coldObservableProvider constructor now requires a logger object
1 parent fb58bf2 commit dda9794

20 files changed

+133
-17
lines changed

packages/tx-construction/src/tx-builder/TxBuilder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ export class GenericTxBuilder implements TxBuilder {
500500
}
501501

502502
const rewardAccounts$ = coldObservableProvider({
503+
logger: contextLogger(this.#logger, 'getOrCreateRewardAccounts'),
503504
pollUntil: (rewardAccounts) =>
504505
allRewardAccounts.every((newAccount) => rewardAccounts.some((acct) => acct.address === newAccount)),
505506
provider: this.#dependencies.txBuilderProviders.rewardAccounts,

packages/util-rxjs/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
"dependencies": {
4141
"@cardano-sdk/util": "workspace:~",
4242
"backoff-rxjs": "^7.0.0",
43-
"rxjs": "^7.4.0"
43+
"rxjs": "^7.4.0",
44+
"ts-log": "^2.2.7"
4445
},
4546
"devDependencies": {
4647
"@cardano-sdk/util-dev": "workspace:~",

packages/util-rxjs/src/coldObservableProvider.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { InvalidStringError, strictEquals } from '@cardano-sdk/util';
2+
import { Logger } from 'ts-log';
23
import {
34
NEVER,
45
Observable,
56
Subject,
7+
catchError,
68
concat,
79
defer,
810
distinctUntilChanged,
@@ -25,6 +27,7 @@ export interface ColdObservableProviderProps<T> {
2527
combinator?: typeof switchMap;
2628
cancel$?: Observable<unknown>;
2729
pollUntil?: (v: T) => boolean;
30+
logger: Logger;
2831
}
2932

3033
export const coldObservableProvider = <T>({
@@ -35,7 +38,8 @@ export const coldObservableProvider = <T>({
3538
equals = strictEquals,
3639
combinator = switchMap,
3740
cancel$ = NEVER,
38-
pollUntil = () => true
41+
pollUntil = () => true,
42+
logger
3943
}: ColdObservableProviderProps<T>) =>
4044
new Observable<T>((subscriber) => {
4145
const cancelOnFatalError$ = new Subject<boolean>();
@@ -48,7 +52,7 @@ export const coldObservableProvider = <T>({
4852
mergeMap((v) =>
4953
pollUntil(v)
5054
? of(v)
51-
: // emit value, but also throw error to force retryBackoff to kick in
55+
: // Emit value, but also throw error to force retryBackoff to kick in
5256
concat(
5357
of(v),
5458
throwError(() => new Error('polling'))
@@ -58,16 +62,33 @@ export const coldObservableProvider = <T>({
5862
).pipe(
5963
retryBackoff({
6064
...retryBackoffConfig,
65+
maxRetries: retryBackoffConfig.maxRetries ?? 10,
6166
shouldRetry: (error) => {
62-
if (retryBackoffConfig.shouldRetry && !retryBackoffConfig.shouldRetry(error)) return false;
67+
logger.error(error);
68+
69+
if (retryBackoffConfig.shouldRetry) {
70+
const shouldRetry = retryBackoffConfig.shouldRetry(error);
71+
logger.debug(`Should retry: ${shouldRetry}`);
72+
73+
if (!shouldRetry) {
74+
return false;
75+
}
76+
}
6377

6478
if (error instanceof InvalidStringError) {
6579
onFatalError?.(error);
6680
cancelOnFatalError$.next(true);
81+
return false;
6782
}
6883

6984
return true;
7085
}
86+
}),
87+
catchError((error) => {
88+
onFatalError?.(error);
89+
90+
// Re-throw the error to propagate it to the subscriber and complete the observable
91+
return throwError(() => error);
7192
})
7293
)
7394
),

packages/util-rxjs/test/coldObservableProvider.test.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BehaviorSubject, EmptyError, Subject, firstValueFrom, lastValueFrom, tap } from 'rxjs';
22
import { InvalidStringError } from '@cardano-sdk/util';
3+
import { Logger } from 'ts-log';
34
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';
45
import { coldObservableProvider } from '../src';
56

@@ -8,11 +9,24 @@ jest.mock('backoff-rxjs', () => ({
89
retryBackoff: jest.fn().mockImplementation((...args) => jest.requireActual('backoff-rxjs').retryBackoff(...args))
910
}));
1011

12+
const createMockLogger = (): Logger => ({
13+
debug: jest.fn(),
14+
error: jest.fn(),
15+
info: jest.fn(),
16+
trace: jest.fn(),
17+
warn: jest.fn()
18+
});
19+
1120
describe('coldObservableProvider', () => {
1221
it('returns an observable that calls underlying provider on each subscription and uses retryBackoff', async () => {
1322
const underlyingProvider = jest.fn().mockResolvedValue(true);
1423
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
15-
const provider$ = coldObservableProvider({ provider: underlyingProvider, retryBackoffConfig: backoffConfig });
24+
const logger = createMockLogger();
25+
const provider$ = coldObservableProvider({
26+
logger,
27+
provider: underlyingProvider,
28+
retryBackoffConfig: backoffConfig
29+
});
1630
expect(await firstValueFrom(provider$)).toBe(true);
1731
expect(await firstValueFrom(provider$)).toBe(true);
1832
expect(underlyingProvider).toBeCalledTimes(2);
@@ -24,8 +38,10 @@ describe('coldObservableProvider', () => {
2438
const underlyingProvider = () => firstValueFrom(fakeProviderSubject);
2539
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
2640
const cancel$ = new BehaviorSubject<boolean>(true);
41+
const logger = createMockLogger();
2742
const provider$ = coldObservableProvider({
2843
cancel$,
44+
logger,
2945
provider: underlyingProvider,
3046
retryBackoffConfig: backoffConfig
3147
});
@@ -39,9 +55,10 @@ describe('coldObservableProvider', () => {
3955
});
4056

4157
it('retries using retryBackoff, when underlying provider rejects', async () => {
58+
const logger = createMockLogger();
4259
const underlyingProvider = jest.fn().mockRejectedValueOnce(false).mockResolvedValue(true);
4360
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1 };
44-
const provider$ = coldObservableProvider({ provider: underlyingProvider, retryBackoffConfig });
61+
const provider$ = coldObservableProvider({ logger, provider: underlyingProvider, retryBackoffConfig });
4562
const resolvedValue = await firstValueFrom(provider$);
4663
expect(underlyingProvider).toBeCalledTimes(2);
4764
expect(resolvedValue).toBeTruthy();
@@ -50,6 +67,7 @@ describe('coldObservableProvider', () => {
5067
it('does not retry, when underlying provider rejects with InvalidStringError', async () => {
5168
const testValue = { test: 'value' };
5269
const testError = new InvalidStringError('Test invalid string error');
70+
const logger = createMockLogger();
5371
const underlyingProvider = jest
5472
.fn()
5573
.mockRejectedValueOnce(new Error('Test error'))
@@ -59,6 +77,7 @@ describe('coldObservableProvider', () => {
5977
const onFatalError = jest.fn();
6078
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true };
6179
const provider$ = coldObservableProvider({
80+
logger,
6281
onFatalError,
6382
provider: underlyingProvider,
6483
retryBackoffConfig
@@ -67,6 +86,7 @@ describe('coldObservableProvider', () => {
6786
await expect(firstValueFrom(provider$)).rejects.toThrow(EmptyError);
6887
expect(underlyingProvider).toBeCalledTimes(3);
6988
expect(onFatalError).toBeCalledWith(testError);
89+
expect(logger.error).toBeCalledWith(testError);
7090
});
7191

7292
it('polls the provider until the pollUntil condition is satisfied', async () => {
@@ -77,8 +97,10 @@ describe('coldObservableProvider', () => {
7797
.mockResolvedValueOnce('c')
7898
.mockResolvedValue('Never reached');
7999
const backoffConfig: RetryBackoffConfig = { initialInterval: 1 };
100+
const logger = createMockLogger();
80101

81102
const provider$ = coldObservableProvider({
103+
logger,
82104
pollUntil: (v) => v === 'c',
83105
provider: underlyingProvider,
84106
retryBackoffConfig: backoffConfig
@@ -89,4 +111,27 @@ describe('coldObservableProvider', () => {
89111
expect(providerValues).toEqual(['a', 'b', 'c']);
90112
expect(underlyingProvider).toBeCalledTimes(3);
91113
});
114+
115+
it('stops retrying after maxRetries attempts and handles the error in catchError', async () => {
116+
const testError = new Error('Test error');
117+
const underlyingProvider = jest.fn().mockRejectedValue(testError);
118+
const maxRetries = 3;
119+
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries };
120+
const onFatalError = jest.fn();
121+
const logger = createMockLogger();
122+
123+
const provider$ = coldObservableProvider({
124+
logger,
125+
onFatalError,
126+
provider: underlyingProvider,
127+
retryBackoffConfig
128+
});
129+
130+
await expect(firstValueFrom(provider$)).rejects.toThrow(testError);
131+
132+
expect(underlyingProvider).toBeCalledTimes(maxRetries + 1);
133+
expect(onFatalError).toBeCalledWith(expect.any(Error));
134+
expect(logger.error).toHaveBeenCalled();
135+
expect(logger.error).toHaveBeenCalledWith(testError);
136+
});
92137
});

packages/wallet/src/Wallets/BaseWallet.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ export class BaseWallet implements ObservableWallet {
370370
this.#addressTracker = createAddressTracker({
371371
addressDiscovery$: coldObservableProvider({
372372
cancel$,
373+
logger: this.#logger,
373374
onFatalError,
374375
provider: () => {
375376
const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager;
@@ -403,6 +404,7 @@ export class BaseWallet implements ObservableWallet {
403404
minPollInterval: pollInterval,
404405
provider$: coldObservableProvider({
405406
cancel$,
407+
logger: this.#logger,
406408
onFatalError,
407409
provider: this.networkInfoProvider.ledgerTip,
408410
retryBackoffConfig
@@ -426,6 +428,7 @@ export class BaseWallet implements ObservableWallet {
426428
coldObservableProvider({
427429
cancel$,
428430
equals: deepEquals,
431+
logger: this.#logger,
429432
onFatalError,
430433
provider: this.networkInfoProvider.eraSummaries,
431434
retryBackoffConfig,
@@ -449,6 +452,7 @@ export class BaseWallet implements ObservableWallet {
449452
coldObservableProvider({
450453
cancel$,
451454
equals: isEqual,
455+
logger: this.#logger,
452456
onFatalError,
453457
provider: this.networkInfoProvider.protocolParameters,
454458
retryBackoffConfig,
@@ -460,6 +464,7 @@ export class BaseWallet implements ObservableWallet {
460464
coldObservableProvider({
461465
cancel$,
462466
equals: isEqual,
467+
logger: this.#logger,
463468
onFatalError,
464469
provider: this.networkInfoProvider.genesisParameters,
465470
retryBackoffConfig,
@@ -593,6 +598,7 @@ export class BaseWallet implements ObservableWallet {
593598
coldObservableProvider({
594599
cancel$,
595600
equals: isEqual,
601+
logger: this.#logger,
596602
onFatalError,
597603
provider: () => this.handleProvider.getPolicyIds(),
598604
retryBackoffConfig

packages/wallet/src/services/AssetsTracker.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export const createAssetService =
131131
assetCache$: Observable<Assets>,
132132
totalBalance$: Observable<Cardano.Value>,
133133
retryBackoffConfig: RetryBackoffConfig,
134+
logger: Logger,
134135
onFatalError?: (value: unknown) => void,
135136
maxAssetInfoCacheAge: Milliseconds = ONE_WEEK
136137
// eslint-disable-next-line max-params
@@ -139,6 +140,7 @@ export const createAssetService =
139140
concatAndCombineLatest(
140141
chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) =>
141142
coldObservableProvider({
143+
logger,
142144
onFatalError,
143145
pollUntil: isEveryAssetInfoComplete,
144146
provider: () =>
@@ -189,6 +191,7 @@ export const createAssetsTracker = (
189191
assetsCache$,
190192
total$,
191193
retryBackoffConfig,
194+
logger,
192195
onFatalError,
193196
maxAssetInfoCacheAge
194197
)

packages/wallet/src/services/DelegationTracker/DelegationTracker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ export const createBlockEpochProvider =
3131
(
3232
chainHistoryProvider: ChainHistoryProvider,
3333
retryBackoffConfig: RetryBackoffConfig,
34+
logger: Logger,
3435
onFatalError?: (value: unknown) => void
3536
) =>
3637
(ids: Cardano.BlockId[]) =>
3738
coldObservableProvider({
39+
logger,
3840
onFatalError,
3941
provider: () => chainHistoryProvider.blocksByHashes({ ids }),
4042
retryBackoffConfig
@@ -131,6 +133,7 @@ export const createDelegationTracker = ({
131133
stakePoolProvider,
132134
stores.stakePools,
133135
retryBackoffConfig,
136+
logger,
134137
onFatalError
135138
),
136139
rewardsHistoryProvider = createRewardsHistoryProvider(rewardsTracker, retryBackoffConfig),
@@ -139,6 +142,7 @@ export const createDelegationTracker = ({
139142
transactionsTracker.outgoing.onChain$,
140143
rewardsTracker,
141144
retryBackoffConfig,
145+
logger,
142146
onFatalError
143147
),
144148
slotEpochCalc$ = eraSummaries$.pipe(map((eraSummaries) => createSlotEpochCalc(eraSummaries)))

packages/wallet/src/services/DelegationTracker/RewardAccounts.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
tap
2020
} from 'rxjs';
2121
import { KeyValueStore } from '../../persistence';
22+
import { Logger } from 'ts-log';
2223
import { OutgoingOnChainTx, TxInFlight } from '../types';
2324
import { PAGE_SIZE } from '../TransactionsTracker';
2425
import { RetryBackoffConfig } from 'backoff-rxjs';
@@ -56,6 +57,7 @@ export const createQueryStakePoolsProvider =
5657
stakePoolProvider: TrackedStakePoolProvider,
5758
store: KeyValueStore<Cardano.PoolId, Cardano.StakePool>,
5859
retryBackoffConfig: RetryBackoffConfig,
60+
logger: Logger,
5961
onFatalError?: (value: unknown) => void
6062
) =>
6163
(poolIds: Cardano.PoolId[]) => {
@@ -66,6 +68,7 @@ export const createQueryStakePoolsProvider =
6668
return merge(
6769
store.getValues(poolIds),
6870
coldObservableProvider({
71+
logger,
6972
onFatalError,
7073
provider: () => allStakePoolsByPoolIds(stakePoolProvider, { poolIds }),
7174
retryBackoffConfig
@@ -109,13 +112,16 @@ export const createRewardsProvider =
109112
txOnChain$: Observable<OutgoingOnChainTx>,
110113
rewardsProvider: RewardsProvider,
111114
retryBackoffConfig: RetryBackoffConfig,
115+
logger: Logger,
112116
onFatalError?: (value: unknown) => void
117+
// eslint-disable-next-line max-params
113118
) =>
114119
(rewardAccounts: Cardano.RewardAccount[], equals = isEqual): Observable<Cardano.Lovelace[]> =>
115120
combineLatest(
116121
rewardAccounts.map((rewardAccount) =>
117122
coldObservableProvider({
118123
equals,
124+
logger,
119125
onFatalError,
120126
provider: () => rewardsProvider.rewardAccountBalance({ rewardAccount }),
121127
retryBackoffConfig,

packages/wallet/src/services/DelegationTracker/RewardsHistory.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ export const createRewardsHistoryProvider =
2424
(
2525
rewardAccounts: Cardano.RewardAccount[],
2626
lowerBound: Cardano.EpochNo | null,
27+
logger: Logger,
2728
onFatalError?: (value: unknown) => void
2829
): Observable<Map<Cardano.RewardAccount, Reward[]>> => {
2930
if (lowerBound) {
3031
return coldObservableProvider({
32+
logger,
3133
onFatalError,
3234
provider: () =>
3335
rewardsProvider.rewardsHistory({
@@ -76,7 +78,7 @@ export const createRewardsHistoryTracker = (
7678
firstDelegationEpoch$(transactions$, rewardAccounts).pipe(
7779
tap((firstEpoch) => logger.debug(`Fetching history rewards since epoch ${firstEpoch}`)),
7880
switchMap((firstEpoch) =>
79-
rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), onFatalError)
81+
rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), logger, onFatalError)
8082
),
8183
tap((allRewards) =>
8284
rewardsHistoryStore.setAll([...allRewards.entries()].map(([key, value]) => ({ key, value })))

packages/wallet/src/services/DrepInfoTracker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ type DrepInfoObservableProps = {
1515

1616
/** Use DRepProvider to fetch DRepInfos with retry backoff logic */
1717
export const createDrepInfoColdObservable =
18-
({ drepProvider, retryBackoffConfig, refetchTrigger$ }: DrepInfoObservableProps) =>
18+
({ drepProvider, retryBackoffConfig, refetchTrigger$, logger }: DrepInfoObservableProps) =>
1919
(drepIds: Cardano.DRepID[]) =>
2020
coldObservableProvider<DRepInfo[]>({
21+
logger,
2122
provider: () => drepProvider.getDRepsInfo({ ids: drepIds }),
2223
retryBackoffConfig,
2324
trigger$: merge(of(true), refetchTrigger$)

0 commit comments

Comments
 (0)