diff --git a/apps/server/src/config.ts b/apps/server/src/config.ts index f52bf0159..8d723f81a 100644 --- a/apps/server/src/config.ts +++ b/apps/server/src/config.ts @@ -36,10 +36,11 @@ export const CONFIG: Config = { fs_device_filter: lst(penv('FS_DEVICE_FILTER') ?? ''), fs_type_filter: lst( penv('FS_TYPE_FILTER') ?? - 'cifs,9p,fuse.rclone,fuse.mergerfs,nfs4,iso9660,fuse.shfs,autofs' + 'cifs,9p,fuse.rclone,fuse.mergerfs,nfs4,iso9660,fuse.shfs,autofs' ), fs_virtual_mounts: lst(penv('FS_VIRTUAL_MOUNTS') ?? ''), disable_integrations: penv('DISABLE_INTEGRATIONS') === 'true', + disable_background_stats_collection: penv('DISABLE_BACKGROUND_STATS_COLLECTION') === 'true', show_dash_version: (penv('SHOW_DASH_VERSION') as any) ?? 'icon_hover', show_host: penv('SHOW_HOST') === 'true', diff --git a/apps/server/src/dynamic-info.ts b/apps/server/src/dynamic-info.ts index 196a6085b..b8e311ccd 100644 --- a/apps/server/src/dynamic-info.ts +++ b/apps/server/src/dynamic-info.ts @@ -1,5 +1,5 @@ import cron from 'node-cron'; -import { interval, mergeMap, Observable, ReplaySubject, Subject } from 'rxjs'; +import { debounceTime, interval, lastValueFrom, mergeMap, Observable, Observer, ReplaySubject, Subject, Subscribable, take, takeUntil, timeout, Unsubscribable } from 'rxjs'; import { inspect } from 'util'; import { CONFIG } from './config'; import getCpuInfo from './data/cpu'; @@ -9,77 +9,146 @@ import getRamInfo from './data/ram'; import getStorageInfo from './data/storage'; import { loadInfo } from './static-info'; -const createBufferedInterval = ( - name: string, - enabled: boolean, - bufferSize: number, - intervalMs: number, - factory: () => Promise -): Observable => { - const buffer = new ReplaySubject(bufferSize); - - if (enabled) { - // Instantly load first value - factory() +class LazyObservable implements Subscribable { + + private observers = []; + + private currentBuffer: Observable; + + private stop = new Subject(); + + constructor( + private name: string, + private enabled: boolean, + private runInBackground: boolean, + private bufferSize: number, + private intervalMs: number, + private dataFactory: () => Promise) { + + if (this.runInBackground) { + this.tryStart(); + } + + } + + public subscribe(observer: Partial> | ((value: T) => void)): Unsubscribable { + if (!this.enabled) { + return new LazyUnsubscribe(); + } + this.tryStart(); + var subscription = this.currentBuffer.subscribe(observer); + this.observers.push(observer); + + return new LazyUnsubscribe(() => { + this.observers = this.observers.filter(x => x != observer); + this.tryComplete(); + subscription.unsubscribe(); + }); + } + + public async getCurrentValue(): Promise { + try { + if (this.currentBuffer) { + return await lastValueFrom(this.currentBuffer.pipe(debounceTime(0), timeout(20), take(1))) + } + return this.dataFactory();; + } catch (e) { + return undefined; + } + }; + + private tryComplete() { + if (!this.runInBackground && this.observers.length == 0) { + this.stop.next(1); + this.stop = new Subject(); + this.currentBuffer = null; + } + } + + private tryStart() { + if (this.currentBuffer == null) { + this.currentBuffer = this.createBuffer(); + } + } + + private createBuffer() { + const buffer = new ReplaySubject(this.bufferSize); + + this.dataFactory() .then(value => { console.log( - `First measurement [${name}]:`, + `First measurement [${this.name}]:`, inspect(value, { showHidden: false, depth: null, colors: true, }) ); - buffer.next(value); }) .catch(err => buffer.error(err)); - // Load values every intervalMs - interval(intervalMs).pipe(mergeMap(factory)).subscribe(buffer); - + interval(this.intervalMs) + .pipe(mergeMap(this.dataFactory), takeUntil(this.stop)) + .subscribe(buffer); return buffer.asObservable(); } +} - return new Observable(); -}; +class LazyUnsubscribe implements Unsubscribable { + constructor(private callback: () => void = null) { } + unsubscribe(): void { + if (this.callback) + this.callback(); + } +} export const getDynamicServerInfo = () => { - const cpuObs = createBufferedInterval( + + // if not disabled, we collect stats in the background + // this was the default behavior until now. keeping it as is + const runInBackground = !CONFIG.disable_background_stats_collection; + + const cpuObs = new LazyObservable( 'CPU', CONFIG.widget_list.includes('cpu'), + runInBackground, CONFIG.cpu_shown_datapoints, CONFIG.cpu_poll_interval, getCpuInfo.dynamic ); - const ramObs = createBufferedInterval( + const ramObs = new LazyObservable( 'RAM', CONFIG.widget_list.includes('ram'), + runInBackground, CONFIG.ram_shown_datapoints, CONFIG.ram_poll_interval, getRamInfo.dynamic ); - const storageObs = createBufferedInterval( + const storageObs = new LazyObservable( 'Storage', CONFIG.widget_list.includes('storage'), + runInBackground, 1, CONFIG.storage_poll_interval, getStorageInfo.dynamic ); - const networkObs = createBufferedInterval( + const networkObs = new LazyObservable( 'Network', CONFIG.widget_list.includes('network'), + runInBackground, CONFIG.network_shown_datapoints, CONFIG.network_poll_interval, getNetworkInfo.dynamic ); - const gpuObs = createBufferedInterval( + const gpuObs = new LazyObservable( 'GPU', CONFIG.widget_list.includes('gpu'), + runInBackground, CONFIG.gpu_shown_datapoints, CONFIG.gpu_poll_interval, getGpuInfo.dynamic diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 1c0a54319..164a0e2ba 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -7,12 +7,7 @@ import http from 'http'; import cron from 'node-cron'; import path from 'path'; import { - debounceTime, - lastValueFrom, - Observable, - Subscription, - take, - timeout, + Unsubscribable, } from 'rxjs'; import { Server } from 'socket.io'; import Path from 'path'; @@ -110,38 +105,26 @@ server.listen(CONFIG.port, async () => { // Allow integrations if (!CONFIG.disable_integrations) { - const getCurrentValue = async ( - subj: Observable - ): Promise => { - try { - return await lastValueFrom( - subj.pipe(debounceTime(0), timeout(20), take(1)) - ); - } catch (e) { - return undefined; - } - }; - router.get('/load/cpu', async (_, res) => { - res.send(await getCurrentValue(obs.cpu)); + res.send(await obs.cpu.getCurrentValue()); }); router.get('/load/ram', async (_, res) => { - res.send({ load: await getCurrentValue(obs.ram) }); + res.send({ load: await obs.ram.getCurrentValue() }); }); router.get('/load/storage', async (_, res) => { - res.send(await getCurrentValue(obs.storage)); + res.send(await obs.storage.getCurrentValue()); }); router.get('/load/network', async (_, res) => { - res.send(await getCurrentValue(obs.network)); + res.send(await obs.network.getCurrentValue()); }); router.get('/load/gpu', async (_, res) => { - res.send(await getCurrentValue(obs.gpu)); + res.send(await obs.gpu.getCurrentValue()); }); } // Send current system status io.on('connection', socket => { - const subscriptions: Subscription[] = []; + const subscriptions: Unsubscribable[] = []; subscriptions.push( getStaticServerInfoObs().subscribe(staticInfo => { @@ -178,7 +161,6 @@ server.listen(CONFIG.port, async () => { socket.emit('gpu-load', gpu); }) ); - socket.on('disconnect', () => { subscriptions.forEach(sub => sub.unsubscribe()); }); diff --git a/libs/common/src/types.ts b/libs/common/src/types.ts index e017c5ac4..7f6748ef4 100644 --- a/libs/common/src/types.ts +++ b/libs/common/src/types.ts @@ -101,6 +101,7 @@ export type Config = { fs_type_filter: string[]; fs_virtual_mounts: string[]; disable_integrations: boolean; + disable_background_stats_collection: boolean; show_dash_version?: 'icon_hover' | 'bottom_right'; show_host: boolean;