11import { Asset , Cardano } from '@cardano-sdk/core' ;
2+ import { Assets } from '../types' ;
3+ import { BalanceTracker , Milliseconds , TransactionsTracker } from './types' ;
24import { Logger } from 'ts-log' ;
35import {
46 Observable ,
@@ -8,6 +10,7 @@ import {
810 debounceTime ,
911 distinctUntilChanged ,
1012 filter ,
13+ firstValueFrom ,
1114 map ,
1215 of ,
1316 share ,
@@ -17,7 +20,6 @@ import {
1720} from 'rxjs' ;
1821import { RetryBackoffConfig } from 'backoff-rxjs' ;
1922import { TrackedAssetProvider } from './ProviderTracker' ;
20- import { TransactionsTracker } from './types' ;
2123import { coldObservableProvider , concatAndCombineLatest } from '@cardano-sdk/util-rxjs' ;
2224import { deepEquals , isNotNil } from '@cardano-sdk/util' ;
2325import { newTransactions$ } from './TransactionsTracker' ;
@@ -35,11 +37,103 @@ const bufferTick =
3537 source$ . pipe ( connect ( ( shared$ ) => shared$ . pipe ( buffer ( shared$ . pipe ( debounceTime ( 1 ) ) ) ) ) ) ;
3638
3739const ASSET_INFO_FETCH_CHUNK_SIZE = 100 ;
40+ const ONE_DAY = 24 * 60 * 60 * 1000 ;
41+ const ONE_WEEK = 7 * ONE_DAY ;
42+
43+ const isInBalance = ( assetId : Cardano . AssetId , balance : Cardano . Value ) : boolean =>
44+ balance . assets ?. has ( assetId ) ?? false ;
45+
46+ /**
47+ * Splits a list of asset IDs into cached and uncached groups based on their presence in the cache,
48+ * their freshness, and their balance status:
49+ *
50+ * 1. Assets not in Balance:
51+ * - Always use the cached version if present in the cache, ignoring freshness.
52+ * 2. Assets in Balance:
53+ * - Use the cached version only if it exists and its `staleAt` timestamp did not expire.
54+ * 3. Uncached Assets:
55+ * - If an asset is not in the cache or does not meet the above criteria, mark it as uncached.
56+ */
57+ const splitCachedAndUncachedAssets = (
58+ cache : Assets ,
59+ balance : Cardano . Value ,
60+ assetIds : Cardano . AssetId [ ]
61+ ) : { cachedAssets : Assets ; uncachedAssetIds : Cardano . AssetId [ ] } => {
62+ const cachedAssets : Assets = new Map ( ) ;
63+ const uncachedAssetIds : Cardano . AssetId [ ] = [ ] ;
64+ const now = new Date ( ) ;
65+
66+ for ( const id of assetIds ) {
67+ const cachedAssetInfo = cache . get ( id ) ;
68+
69+ if ( ! cachedAssetInfo ) {
70+ uncachedAssetIds . push ( id ) ;
71+ continue ;
72+ }
73+
74+ const { staleAt } = cachedAssetInfo ;
75+
76+ const expired = ! staleAt || new Date ( staleAt ) < now ;
77+
78+ const mustFetch = ! isAssetInfoComplete ( cachedAssetInfo ) || ( isInBalance ( id , balance ) && expired ) ;
79+
80+ if ( mustFetch ) {
81+ uncachedAssetIds . push ( id ) ;
82+ } else {
83+ cachedAssets . set ( id , cachedAssetInfo ) ;
84+ }
85+ }
86+
87+ return { cachedAssets, uncachedAssetIds } ;
88+ } ;
89+
90+ const getAssetsWithCache = async (
91+ assetIdsChunk : Cardano . AssetId [ ] ,
92+ assetCache$ : Observable < Assets > ,
93+ totalBalance$ : Observable < Cardano . Value > ,
94+ assetProvider : TrackedAssetProvider ,
95+ maxAssetInfoCacheAge : Milliseconds
96+ ) : Promise < Asset . AssetInfo [ ] > => {
97+ const [ cache , totalValue ] = await Promise . all ( [ firstValueFrom ( assetCache$ ) , firstValueFrom ( totalBalance$ ) ] ) ;
98+
99+ const { cachedAssets, uncachedAssetIds } = splitCachedAndUncachedAssets ( cache , totalValue , assetIdsChunk ) ;
100+
101+ if ( uncachedAssetIds . length === 0 ) {
102+ // If all assets are cached we wont perform any fetches from assetProvider, but still need to
103+ // mark it as initialized.
104+ if ( ! assetProvider . stats . getAsset$ . value . initialized ) {
105+ assetProvider . setStatInitialized ( assetProvider . stats . getAsset$ ) ;
106+ }
107+
108+ return [ ...cachedAssets . values ( ) ] ;
109+ }
110+
111+ const fetchedAssets = await assetProvider . getAssets ( {
112+ assetIds : uncachedAssetIds ,
113+ extraData : { nftMetadata : true , tokenMetadata : true }
114+ } ) ;
115+
116+ const now = Date . now ( ) ;
117+ const updatedFetchedAssets = fetchedAssets . map ( ( asset ) => {
118+ const randomDelta = Math . floor ( Math . random ( ) * 2 * ONE_DAY ) ; // Random time between 0 and 2 days
119+ return {
120+ ...asset ,
121+ staleAt : new Date ( now + maxAssetInfoCacheAge + randomDelta )
122+ } ;
123+ } ) ;
124+
125+ return [ ...cachedAssets . values ( ) , ...updatedFetchedAssets ] ;
126+ } ;
127+
38128export const createAssetService =
39129 (
40130 assetProvider : TrackedAssetProvider ,
131+ assetCache$ : Observable < Assets > ,
132+ totalBalance$ : Observable < Cardano . Value > ,
41133 retryBackoffConfig : RetryBackoffConfig ,
42- onFatalError ?: ( value : unknown ) => void
134+ onFatalError ?: ( value : unknown ) => void ,
135+ maxAssetInfoCacheAge : Milliseconds = ONE_WEEK
136+ // eslint-disable-next-line max-params
43137 ) =>
44138 ( assetIds : Cardano . AssetId [ ] ) =>
45139 concatAndCombineLatest (
@@ -48,23 +142,24 @@ export const createAssetService =
48142 onFatalError,
49143 pollUntil : isEveryAssetInfoComplete ,
50144 provider : ( ) =>
51- assetProvider . getAssets ( {
52- assetIds : assetIdsChunk ,
53- extraData : { nftMetadata : true , tokenMetadata : true }
54- } ) ,
145+ getAssetsWithCache ( assetIdsChunk , assetCache$ , totalBalance$ , assetProvider , maxAssetInfoCacheAge ) ,
55146 retryBackoffConfig,
56147 trigger$ : of ( true ) // fetch only once
57148 } )
58149 )
59- ) . pipe ( map ( ( arr ) => arr . flat ( ) ) ) ; // concat the chunk results
150+ ) . pipe ( map ( ( arr ) => arr . flat ( ) ) ) ; // Concatenate the chunk results
151+
60152export type AssetService = ReturnType < typeof createAssetService > ;
61153
62154export interface AssetsTrackerProps {
63155 transactionsTracker : TransactionsTracker ;
64156 assetProvider : TrackedAssetProvider ;
65157 retryBackoffConfig : RetryBackoffConfig ;
66158 logger : Logger ;
159+ assetsCache$ : Observable < Assets > ;
160+ balanceTracker : BalanceTracker ;
67161 onFatalError ?: ( value : unknown ) => void ;
162+ maxAssetInfoCacheAge ?: Milliseconds ;
68163}
69164
70165interface AssetsTrackerInternals {
@@ -76,8 +171,28 @@ const uniqueAssetIds = ({ body: { outputs } }: Cardano.OnChainTx) =>
76171const flatUniqueAssetIds = ( txes : Cardano . OnChainTx [ ] ) => uniq ( txes . flatMap ( uniqueAssetIds ) ) ;
77172
78173export const createAssetsTracker = (
79- { assetProvider, transactionsTracker : { history$ } , retryBackoffConfig, logger, onFatalError } : AssetsTrackerProps ,
80- { assetService = createAssetService ( assetProvider , retryBackoffConfig , onFatalError ) } : AssetsTrackerInternals = { }
174+ {
175+ assetProvider,
176+ assetsCache$,
177+ transactionsTracker : { history$ } ,
178+ balanceTracker : {
179+ utxo : { total$ }
180+ } ,
181+ retryBackoffConfig,
182+ logger,
183+ onFatalError,
184+ maxAssetInfoCacheAge
185+ } : AssetsTrackerProps ,
186+ {
187+ assetService = createAssetService (
188+ assetProvider ,
189+ assetsCache$ ,
190+ total$ ,
191+ retryBackoffConfig ,
192+ onFatalError ,
193+ maxAssetInfoCacheAge
194+ )
195+ } : AssetsTrackerInternals = { }
81196) =>
82197 new Observable < Map < Cardano . AssetId , Asset . AssetInfo > > ( ( subscriber ) => {
83198 let fetchedAssetInfoMap = new Map < Cardano . AssetId , Asset . AssetInfo > ( ) ;
0 commit comments