From 6c84bbc2c180e8035f69e6281de2c4465e0814a2 Mon Sep 17 00:00:00 2001 From: RMO Date: Wed, 27 Aug 2025 15:09:29 -0600 Subject: [PATCH] docs: add collaboration and live-stream references --- .../app/studio/src/service/SessionService.ts | 2 ++ .../app/studio/src/service/StudioSignal.ts | 2 ++ .../app/studio/src/service/SyncLogService.ts | 4 ++- .../docs/docs-dev/architecture/overview.md | 4 ++- .../docs/docs-dev/services/live-stream.md | 10 +++++++ .../docs/docs-user/features/collaboration.md | 13 +++++++++ packages/lib/fusion/src/live-stream/Flags.ts | 1 + .../src/live-stream/LiveStreamBroadcaster.ts | 29 +++++++++++++++---- .../src/live-stream/LiveStreamReceiver.ts | 23 +++++++++++++++ packages/lib/fusion/src/live-stream/Lock.ts | 1 + .../lib/fusion/src/live-stream/PackageType.ts | 1 + .../lib/fusion/src/live-stream/Protocol.ts | 13 +++++++++ .../lib/fusion/src/live-stream/Subscribers.ts | 12 ++++++++ 13 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 packages/docs/docs-dev/services/live-stream.md create mode 100644 packages/docs/docs-user/features/collaboration.md diff --git a/packages/app/studio/src/service/SessionService.ts b/packages/app/studio/src/service/SessionService.ts index aefcb99dd..2d52932d5 100644 --- a/packages/app/studio/src/service/SessionService.ts +++ b/packages/app/studio/src/service/SessionService.ts @@ -1,5 +1,7 @@ /** * Service handling persistence and lifecycle operations for project sessions. + * Works in concert with {@link SyncLogService} for commit logging and emits + * {@link StudioSignal} events to update the UI. */ import {ProjectSession} from "@/project/ProjectSession" import { diff --git a/packages/app/studio/src/service/StudioSignal.ts b/packages/app/studio/src/service/StudioSignal.ts index 24dac68e9..25664512b 100644 --- a/packages/app/studio/src/service/StudioSignal.ts +++ b/packages/app/studio/src/service/StudioSignal.ts @@ -3,6 +3,8 @@ import {Sample} from "@opendaw/studio-adapters" /** * Events broadcast by {@link StudioService} to update various UI elements. + * These complement the {@link SessionService} lifecycle and other session + * oriented services. * * ```mermaid * classDiagram diff --git a/packages/app/studio/src/service/SyncLogService.ts b/packages/app/studio/src/service/SyncLogService.ts index e151f68c5..30e84370e 100644 --- a/packages/app/studio/src/service/SyncLogService.ts +++ b/packages/app/studio/src/service/SyncLogService.ts @@ -6,7 +6,9 @@ import {StudioService} from "@/service/StudioService" import {Commit, SyncLogReader, SyncLogWriter} from "@opendaw/studio-core" /** - * Service helpers for writing and appending project SyncLog files. + * Service helpers for writing and appending project SyncLog files. Intended to + * be used alongside the {@link SessionService} to persist collaborative + * session history. * * ```mermaid * flowchart LR diff --git a/packages/docs/docs-dev/architecture/overview.md b/packages/docs/docs-dev/architecture/overview.md index d4d9a10fd..e346d4607 100644 --- a/packages/docs/docs-dev/architecture/overview.md +++ b/packages/docs/docs-dev/architecture/overview.md @@ -77,7 +77,9 @@ learn how to build the project in [Build and Run](../build-and-run/setup.md). - **Config** – Delivers runtime and build settings consumed by other components. Communication between these parts is based on lightweight message channels; see -the [messaging architecture](./messaging.md) for details. +the [messaging architecture](./messaging.md) for details. For real-time +collaboration features, explore the [Live Stream service](../services/live-stream.md) +and the user-facing [collaboration guide](../../docs-user/features/collaboration.md). ## Worker Lifecycle diff --git a/packages/docs/docs-dev/services/live-stream.md b/packages/docs/docs-dev/services/live-stream.md new file mode 100644 index 000000000..e064db146 --- /dev/null +++ b/packages/docs/docs-dev/services/live-stream.md @@ -0,0 +1,10 @@ +# Live Stream Service + +Streams real-time data between workers and the UI using shared memory buffers. + +```mermaid +flowchart LR + broadcaster -->|SharedArrayBuffer| receiver +``` + +See the user-facing [collaboration feature](../../docs-user/features/collaboration.md) for how this service is used. diff --git a/packages/docs/docs-user/features/collaboration.md b/packages/docs/docs-user/features/collaboration.md new file mode 100644 index 000000000..71aeb1d2f --- /dev/null +++ b/packages/docs/docs-user/features/collaboration.md @@ -0,0 +1,13 @@ +# Collaboration + +Share project changes in real time with other users. + +## Start a session +1. Open the collaboration menu. +2. Invite participants and begin sharing. + +## Leave a session +1. Open the collaboration menu. +2. Choose **Disconnect** to stop sharing updates. + +Developers can learn about the underlying service in the [Live Stream documentation](../../docs-dev/services/live-stream.md). diff --git a/packages/lib/fusion/src/live-stream/Flags.ts b/packages/lib/fusion/src/live-stream/Flags.ts index 403d9decb..4244e5765 100644 --- a/packages/lib/fusion/src/live-stream/Flags.ts +++ b/packages/lib/fusion/src/live-stream/Flags.ts @@ -1,3 +1,4 @@ +/** Magic values framing the stream and marking metadata boundaries. */ export const enum Flags { ID = 0xF0FF0F, START = 0xF0F0F0, END = 0x0F0F0F } \ No newline at end of file diff --git a/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts b/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts index c259bc59f..5120f1db0 100644 --- a/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts +++ b/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts @@ -24,6 +24,12 @@ interface Package { put(output: ByteArrayOutput): void } +/** + * Serialises values provided by various publishers into a shared buffer and + * forwards updates to a {@link LiveStreamReceiver}. The broadcaster owns the + * data buffer and controls when receivers may read from it by toggling a + * shared {@link Lock}. + */ export class LiveStreamBroadcaster { static create(messenger: Messenger, name: string): LiveStreamBroadcaster { return new LiveStreamBroadcaster(messenger.channel(name)) @@ -52,6 +58,12 @@ export class LiveStreamBroadcaster { }) } + /** + * Flushes the pending packages into the shared buffer. When the structure + * changes, the receiver is first notified with a new layout description and + * data buffer. Actual values are written only when the consumer signals that + * it is ready by setting the {@link Lock} to {@link Lock.WRITE}. + */ flush(): void { const update = this.#updateAvailable() if (update.nonEmpty()) { @@ -59,9 +71,9 @@ export class LiveStreamBroadcaster { this.#sender.sendShareLock(this.#lock) this.#lockShared = true } - this.#sender.sendUpdateStructure(update.unwrap()) - const capacity = this.#computeCapacity() - if (this.#output.remaining < capacity) { + this.#sender.sendUpdateStructure(update.unwrap()) + const capacity = this.#computeCapacity() + if (this.#output.remaining < capacity) { const size = nextPowOf2(capacity) const data = new SharedArrayBuffer(size) this.#output = ByteArrayOutput.use(data) @@ -70,8 +82,9 @@ export class LiveStreamBroadcaster { } } if (this.#sabOption.isEmpty()) {return} - // If main-thread is not interested, no data will ever be sent again, since it will not set the lock to CAN_WRITE. - // No lock is necessary since the other side skips reading until we set the lock to CAN_READ. + // If main-thread is not interested, no data will ever be sent again, + // since it will not set the lock to CAN_WRITE. No lock is necessary since + // the other side skips reading until we set the lock to CAN_READ. if (Atomics.load(this.#lockArray, 0) === Lock.WRITE) { this.#flushData(this.#output) this.#output.position = 0 @@ -156,6 +169,7 @@ export class LiveStreamBroadcaster { return this.#capacity } + /** Clears all stored packages and resets state. */ terminate(): void { Arrays.clear(this.#packages) this.#availableUpdate = Option.None @@ -163,6 +177,7 @@ export class LiveStreamBroadcaster { this.#capacity = 0 } + /** Writes the version and all package payloads into the buffer. */ #flushData(output: ByteArrayOutput): void { assert(!this.#invalid && this.#availableUpdate.isEmpty(), "Cannot flush while update is available") const requiredCapacity = this.#computeCapacity() @@ -173,10 +188,12 @@ export class LiveStreamBroadcaster { output.writeInt(Flags.END) } + /** Computes the sum of capacities for all registered packages. */ #sumRequiredCapacity(): int { return this.#packages.reduce((sum, pack) => sum + pack.capacity, 0) } + /** Registers a package and returns a handle to remove it again. */ #storeChunk(pack: Package): Terminable { this.#packages.push(pack) this.#invalidate() @@ -188,11 +205,13 @@ export class LiveStreamBroadcaster { } } + /** Marks the structure as outdated. */ #invalidate(): void { this.#capacity = -1 this.#invalid = true } + /** Builds a structure description listing all packages and their types. */ #compileStructure(): ArrayBufferLike { const output = ByteArrayOutput.create() output.writeInt(Flags.ID) diff --git a/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts b/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts index c172c8cc9..453905aee 100644 --- a/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts +++ b/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts @@ -21,6 +21,11 @@ import {Protocol} from "./Protocol" import {Lock} from "./Lock" import {Flags} from "./Flags" +/** + * Consumes live stream updates produced by a {@link LiveStreamBroadcaster}. + * The receiver reacts to structural changes, manages subscriptions and + * dispatches incoming values to listeners on every animation frame. + */ interface Package extends Terminable { dispatch(address: Address, input: ByteArrayInput): void subscribe(address: Address, procedure: Procedure): Subscription @@ -140,6 +145,11 @@ export class LiveStreamReceiver implements Terminable { this.#packages[PackageType.ByteArray] = this.#bytes } + /** + * Connects the receiver to a messenger channel and starts dispatching + * frames. The returned terminable tears down the connection and stops + * processing when invoked. + */ connect(messenger: Messenger): Terminable { assert(!this.#connected, "Already connected") this.#connected = true @@ -189,6 +199,10 @@ export class LiveStreamReceiver implements Terminable { terminate(): void {this.#disconnect()} + /** + * Called for every animation frame; checks the shared lock and dispatches + * the latest data package to all registered handlers when available. + */ #dispatch(): void { if (this.#optLock.isEmpty() || this.#memory.isEmpty()) {return} const lock = this.#optLock.unwrap() @@ -200,11 +214,16 @@ export class LiveStreamReceiver implements Terminable { } } + /** Rebuilds dispatch procedures when the stream structure changes. */ #updateStructure(input: ByteArrayInput): void { Arrays.clear(this.#procedures) this.#parseStructure(input) } + /** + * Reads a data block and executes the stored procedures in order. Outdated + * versions are skipped until a matching structure arrives. + */ #dispatchData(input: ByteArrayInput): boolean { const version = input.readInt() if (version !== this.#structureVersion) { @@ -217,6 +236,10 @@ export class LiveStreamReceiver implements Terminable { return true } + /** + * Parses a structure update and prepares dispatch procedures for each + * registered address/type pair. + */ #parseStructure(input: ByteArrayInput): void { if (input.readInt() !== Flags.ID) {throw new Error("no valid id")} const version = input.readInt() diff --git a/packages/lib/fusion/src/live-stream/Lock.ts b/packages/lib/fusion/src/live-stream/Lock.ts index b496ba355..5da78665d 100644 --- a/packages/lib/fusion/src/live-stream/Lock.ts +++ b/packages/lib/fusion/src/live-stream/Lock.ts @@ -1 +1,2 @@ +/** Access control flag for the shared data buffer. */ export enum Lock {WRITE = 0, READ = 1} \ No newline at end of file diff --git a/packages/lib/fusion/src/live-stream/PackageType.ts b/packages/lib/fusion/src/live-stream/PackageType.ts index d3eaefbc5..d008351e8 100644 --- a/packages/lib/fusion/src/live-stream/PackageType.ts +++ b/packages/lib/fusion/src/live-stream/PackageType.ts @@ -1 +1,2 @@ +/** Types of payloads that can be streamed between broadcaster and receiver. */ export enum PackageType {Float, FloatArray, Integer, IntegerArray, ByteArray} \ No newline at end of file diff --git a/packages/lib/fusion/src/live-stream/Protocol.ts b/packages/lib/fusion/src/live-stream/Protocol.ts index 95c9f51ae..8ae0b6d23 100644 --- a/packages/lib/fusion/src/live-stream/Protocol.ts +++ b/packages/lib/fusion/src/live-stream/Protocol.ts @@ -1,5 +1,18 @@ +/** + * Defines the messaging contract between a {@link LiveStreamBroadcaster} + * and a {@link LiveStreamReceiver}. Implementations forward shared memory + * references and update packets across thread boundaries. + */ export interface Protocol { + /** Share the lock used to coordinate access to the data buffer. */ sendShareLock(lock: SharedArrayBuffer): void + + /** Transfer a new data buffer containing the actual stream values. */ sendUpdateData(data: ArrayBufferLike): void + + /** + * Send an updated structure description describing the layout of the + * forthcoming data packages. + */ sendUpdateStructure(structure: ArrayBufferLike): void } \ No newline at end of file diff --git a/packages/lib/fusion/src/live-stream/Subscribers.ts b/packages/lib/fusion/src/live-stream/Subscribers.ts index 1520f3b14..21c954ecc 100644 --- a/packages/lib/fusion/src/live-stream/Subscribers.ts +++ b/packages/lib/fusion/src/live-stream/Subscribers.ts @@ -3,15 +3,26 @@ import {Address} from "@opendaw/lib-box" type ListenersEntry = { address: Address, listeners: Array } +/** + * Helper structure for mapping addresses to listener arrays. It supports + * subscription management and automatic cleanup when all listeners of an + * address are removed. + */ export class Subscribers implements Terminable { readonly #subscribers: SortedSet> constructor() {this.#subscribers = Address.newSet>(entry => entry.address)} + /** Retrieve listeners for an address if present. */ getOrNull(address: Address): Nullish> {return this.#subscribers.getOrNull(address)?.listeners} + /** Check whether an address has any subscribers. */ isEmpty(address: Address): boolean {return !this.#subscribers.hasKey(address) } + /** + * Add a listener for the given address and return a subscription used to + * remove it. + */ subscribe(address: Address, listener: T): Subscription { const entry = this.#subscribers.getOrNull(address) if (isDefined(entry)) { @@ -31,5 +42,6 @@ export class Subscribers implements Terminable { } } + /** Remove all listeners from all addresses. */ terminate(): void {this.#subscribers.clear()} } \ No newline at end of file