Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/server/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ export const CONFIG: Config = {
fs_device_filter: lst(penv('FS_DEVICE_FILTER') ?? ''),
fs_type_filter: lst(
penv('FS_TYPE_FILTER') ??
'cifs,9p,fuse.rclone,fuse.mergerfs,nfs4,iso9660,fuse.shfs,autofs'
'cifs,9p,fuse.rclone,fuse.mergerfs,nfs4,iso9660,fuse.shfs,autofs'
),
fs_virtual_mounts: lst(penv('FS_VIRTUAL_MOUNTS') ?? ''),
disable_integrations: penv('DISABLE_INTEGRATIONS') === 'true',
disable_background_stats_collection: penv('DISABLE_BACKGROUND_STATS_COLLECTION') === 'true',

show_dash_version: (penv('SHOW_DASH_VERSION') as any) ?? 'icon_hover',
show_host: penv('SHOW_HOST') === 'true',
Expand Down
119 changes: 94 additions & 25 deletions apps/server/src/dynamic-info.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import cron from 'node-cron';
import { interval, mergeMap, Observable, ReplaySubject, Subject } from 'rxjs';
import { debounceTime, interval, lastValueFrom, mergeMap, Observable, Observer, ReplaySubject, Subject, Subscribable, take, takeUntil, timeout, Unsubscribable } from 'rxjs';
import { inspect } from 'util';
import { CONFIG } from './config';
import getCpuInfo from './data/cpu';
Expand All @@ -9,77 +9,146 @@ import getRamInfo from './data/ram';
import getStorageInfo from './data/storage';
import { loadInfo } from './static-info';

const createBufferedInterval = <R>(
name: string,
enabled: boolean,
bufferSize: number,
intervalMs: number,
factory: () => Promise<R>
): Observable<R> => {
const buffer = new ReplaySubject<R>(bufferSize);

if (enabled) {
// Instantly load first value
factory()
class LazyObservable<T> implements Subscribable<T> {

private observers = [];

private currentBuffer: Observable<T>;

private stop = new Subject<any>();

constructor(
private name: string,
private enabled: boolean,
private runInBackground: boolean,
private bufferSize: number,
private intervalMs: number,
private dataFactory: () => Promise<T>) {

if (this.runInBackground) {
this.tryStart();
}

}

public subscribe(observer: Partial<Observer<T>> | ((value: T) => void)): Unsubscribable {
if (!this.enabled) {
return new LazyUnsubscribe();
}
this.tryStart();
var subscription = this.currentBuffer.subscribe(observer);
this.observers.push(observer);

return new LazyUnsubscribe(() => {
this.observers = this.observers.filter(x => x != observer);
this.tryComplete();
subscription.unsubscribe();
});
}

public async getCurrentValue(): Promise<T | undefined> {
try {
if (this.currentBuffer) {
return await lastValueFrom(this.currentBuffer.pipe(debounceTime(0), timeout(20), take(1)))
}
return this.dataFactory();;
} catch (e) {
return undefined;
}
};

private tryComplete() {
if (!this.runInBackground && this.observers.length == 0) {
this.stop.next(1);
this.stop = new Subject();
this.currentBuffer = null;
}
}

private tryStart() {
if (this.currentBuffer == null) {
this.currentBuffer = this.createBuffer();
}
}

private createBuffer() {
const buffer = new ReplaySubject<T>(this.bufferSize);

this.dataFactory()
.then(value => {
console.log(
`First measurement [${name}]:`,
`First measurement [${this.name}]:`,
inspect(value, {
showHidden: false,
depth: null,
colors: true,
})
);

buffer.next(value);
})
.catch(err => buffer.error(err));

// Load values every intervalMs
interval(intervalMs).pipe(mergeMap(factory)).subscribe(buffer);

interval(this.intervalMs)
.pipe(mergeMap(this.dataFactory), takeUntil(this.stop))
.subscribe(buffer);
return buffer.asObservable();
}
}

return new Observable();
};
class LazyUnsubscribe implements Unsubscribable {
constructor(private callback: () => void = null) { }
unsubscribe(): void {
if (this.callback)
this.callback();
}
}

export const getDynamicServerInfo = () => {
const cpuObs = createBufferedInterval(

// if not disabled, we collect stats in the background
// this was the default behavior until now. keeping it as is
const runInBackground = !CONFIG.disable_background_stats_collection;

const cpuObs = new LazyObservable(
'CPU',
CONFIG.widget_list.includes('cpu'),
runInBackground,
CONFIG.cpu_shown_datapoints,
CONFIG.cpu_poll_interval,
getCpuInfo.dynamic
);

const ramObs = createBufferedInterval(
const ramObs = new LazyObservable(
'RAM',
CONFIG.widget_list.includes('ram'),
runInBackground,
CONFIG.ram_shown_datapoints,
CONFIG.ram_poll_interval,
getRamInfo.dynamic
);

const storageObs = createBufferedInterval(
const storageObs = new LazyObservable(
'Storage',
CONFIG.widget_list.includes('storage'),
runInBackground,
1,
CONFIG.storage_poll_interval,
getStorageInfo.dynamic
);

const networkObs = createBufferedInterval(
const networkObs = new LazyObservable(
'Network',
CONFIG.widget_list.includes('network'),
runInBackground,
CONFIG.network_shown_datapoints,
CONFIG.network_poll_interval,
getNetworkInfo.dynamic
);

const gpuObs = createBufferedInterval(
const gpuObs = new LazyObservable(
'GPU',
CONFIG.widget_list.includes('gpu'),
runInBackground,
CONFIG.gpu_shown_datapoints,
CONFIG.gpu_poll_interval,
getGpuInfo.dynamic
Expand Down
32 changes: 7 additions & 25 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ import http from 'http';
import cron from 'node-cron';
import path from 'path';
import {
debounceTime,
lastValueFrom,
Observable,
Subscription,
take,
timeout,
Unsubscribable,
} from 'rxjs';
import { Server } from 'socket.io';
import Path from 'path';
Expand Down Expand Up @@ -110,38 +105,26 @@ server.listen(CONFIG.port, async () => {

// Allow integrations
if (!CONFIG.disable_integrations) {
const getCurrentValue = async <T>(
subj: Observable<T>
): Promise<T | undefined> => {
try {
return await lastValueFrom(
subj.pipe(debounceTime(0), timeout(20), take(1))
);
} catch (e) {
return undefined;
}
};

router.get('/load/cpu', async (_, res) => {
res.send(await getCurrentValue(obs.cpu));
res.send(await obs.cpu.getCurrentValue());
});
router.get('/load/ram', async (_, res) => {
res.send({ load: await getCurrentValue(obs.ram) });
res.send({ load: await obs.ram.getCurrentValue() });
});
router.get('/load/storage', async (_, res) => {
res.send(await getCurrentValue(obs.storage));
res.send(await obs.storage.getCurrentValue());
});
router.get('/load/network', async (_, res) => {
res.send(await getCurrentValue(obs.network));
res.send(await obs.network.getCurrentValue());
});
router.get('/load/gpu', async (_, res) => {
res.send(await getCurrentValue(obs.gpu));
res.send(await obs.gpu.getCurrentValue());
});
}

// Send current system status
io.on('connection', socket => {
const subscriptions: Subscription[] = [];
const subscriptions: Unsubscribable[] = [];

subscriptions.push(
getStaticServerInfoObs().subscribe(staticInfo => {
Expand Down Expand Up @@ -178,7 +161,6 @@ server.listen(CONFIG.port, async () => {
socket.emit('gpu-load', gpu);
})
);

socket.on('disconnect', () => {
subscriptions.forEach(sub => sub.unsubscribe());
});
Expand Down
1 change: 1 addition & 0 deletions libs/common/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export type Config = {
fs_type_filter: string[];
fs_virtual_mounts: string[];
disable_integrations: boolean;
disable_background_stats_collection: boolean;

show_dash_version?: 'icon_hover' | 'bottom_right';
show_host: boolean;
Expand Down