diff --git a/packages/app/studio/src/service/SessionService.ts b/packages/app/studio/src/service/SessionService.ts index aefcb99dd..2d52932d5 100644 --- a/packages/app/studio/src/service/SessionService.ts +++ b/packages/app/studio/src/service/SessionService.ts @@ -1,5 +1,7 @@ /** * Service handling persistence and lifecycle operations for project sessions. + * Works in concert with {@link SyncLogService} for commit logging and emits + * {@link StudioSignal} events to update the UI. */ import {ProjectSession} from "@/project/ProjectSession" import { diff --git a/packages/app/studio/src/service/StudioSignal.ts b/packages/app/studio/src/service/StudioSignal.ts index 24dac68e9..25664512b 100644 --- a/packages/app/studio/src/service/StudioSignal.ts +++ b/packages/app/studio/src/service/StudioSignal.ts @@ -3,6 +3,8 @@ import {Sample} from "@opendaw/studio-adapters" /** * Events broadcast by {@link StudioService} to update various UI elements. + * These complement the {@link SessionService} lifecycle and other session + * oriented services. * * ```mermaid * classDiagram diff --git a/packages/app/studio/src/service/SyncLogService.ts b/packages/app/studio/src/service/SyncLogService.ts index e151f68c5..30e84370e 100644 --- a/packages/app/studio/src/service/SyncLogService.ts +++ b/packages/app/studio/src/service/SyncLogService.ts @@ -6,7 +6,9 @@ import {StudioService} from "@/service/StudioService" import {Commit, SyncLogReader, SyncLogWriter} from "@opendaw/studio-core" /** - * Service helpers for writing and appending project SyncLog files. + * Service helpers for writing and appending project SyncLog files. Intended to + * be used alongside the {@link SessionService} to persist collaborative + * session history. * * ```mermaid * flowchart LR diff --git a/packages/docs/docs-dev/architecture/overview.md b/packages/docs/docs-dev/architecture/overview.md index d8253352b..a7748ee2a 100644 --- a/packages/docs/docs-dev/architecture/overview.md +++ b/packages/docs/docs-dev/architecture/overview.md @@ -79,8 +79,9 @@ For a deeper look at timing, see the [audio path](./audio-path.md) and the - **Config** – Delivers runtime and build settings consumed by other components. Communication between these parts is based on lightweight message channels; see -the [messaging architecture](./messaging.md) for details. Concrete worker -protocols are described in the [Fusion docs](../fusion/overview.md). +the [messaging architecture](./messaging.md) for details. For real-time +collaboration features, explore the [Live Stream service](../services/live-stream.md) +and the user-facing [collaboration guide](../../docs-user/features/collaboration.md). ## Worker Lifecycle diff --git a/packages/docs/docs-dev/services/live-stream.md b/packages/docs/docs-dev/services/live-stream.md new file mode 100644 index 000000000..e064db146 --- /dev/null +++ b/packages/docs/docs-dev/services/live-stream.md @@ -0,0 +1,10 @@ +# Live Stream Service + +Streams real-time data between workers and the UI using shared memory buffers. + +```mermaid +flowchart LR + broadcaster -->|SharedArrayBuffer| receiver +``` + +See the user-facing [collaboration feature](../../docs-user/features/collaboration.md) for how this service is used. diff --git a/packages/docs/docs-user/features/collaboration.md b/packages/docs/docs-user/features/collaboration.md new file mode 100644 index 000000000..71aeb1d2f --- /dev/null +++ b/packages/docs/docs-user/features/collaboration.md @@ -0,0 +1,13 @@ +# Collaboration + +Share project changes in real time with other users. + +## Start a session +1. Open the collaboration menu. +2. Invite participants and begin sharing. + +## Leave a session +1. Open the collaboration menu. +2. Choose **Disconnect** to stop sharing updates. + +Developers can learn about the underlying service in the [Live Stream documentation](../../docs-dev/services/live-stream.md). diff --git a/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts b/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts index 89adcfb2c..bb4669a04 100644 --- a/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts +++ b/packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts @@ -25,9 +25,10 @@ interface Package { } /** - * Broadcasts typed data packages over a {@link Messenger} channel. Packages - * are written into a shared buffer and synchronised with connected - * {@link LiveStreamReceiver} instances using a simple lock. + * Serialises values provided by various publishers into a shared buffer and + * forwards updates to a {@link LiveStreamReceiver}. The broadcaster owns the + * data buffer and controls when receivers may read from it by toggling a + * shared {@link Lock}. */ export class LiveStreamBroadcaster { /** @@ -61,8 +62,10 @@ export class LiveStreamBroadcaster { } /** - * Flushes any pending structural updates and, when permitted by the shared - * lock, writes the latest package data to the output buffer. + * Flushes the pending packages into the shared buffer. When the structure + * changes, the receiver is first notified with a new layout description and + * data buffer. Actual values are written only when the consumer signals that + * it is ready by setting the {@link Lock} to {@link Lock.WRITE}. */ flush(): void { const update = this.#updateAvailable() @@ -71,9 +74,9 @@ export class LiveStreamBroadcaster { this.#sender.sendShareLock(this.#lock) this.#lockShared = true } - this.#sender.sendUpdateStructure(update.unwrap()) - const capacity = this.#computeCapacity() - if (this.#output.remaining < capacity) { + this.#sender.sendUpdateStructure(update.unwrap()) + const capacity = this.#computeCapacity() + if (this.#output.remaining < capacity) { const size = nextPowOf2(capacity) const data = new SharedArrayBuffer(size) this.#output = ByteArrayOutput.use(data) @@ -82,8 +85,9 @@ export class LiveStreamBroadcaster { } } if (this.#sabOption.isEmpty()) {return} - // If main-thread is not interested, no data will ever be sent again, since it will not set the lock to CAN_WRITE. - // No lock is necessary since the other side skips reading until we set the lock to CAN_READ. + // If main-thread is not interested, no data will ever be sent again, + // since it will not set the lock to CAN_WRITE. No lock is necessary since + // the other side skips reading until we set the lock to CAN_READ. if (Atomics.load(this.#lockArray, 0) === Lock.WRITE) { this.#flushData(this.#output) this.#output.position = 0 @@ -173,7 +177,7 @@ export class LiveStreamBroadcaster { return this.#capacity } - /** Clears any registered packages and releases internal resources. */ + /** Clears all stored packages and resets state. */ terminate(): void { Arrays.clear(this.#packages) this.#availableUpdate = Option.None @@ -181,6 +185,7 @@ export class LiveStreamBroadcaster { this.#capacity = 0 } + /** Writes the version and all package payloads into the buffer. */ #flushData(output: ByteArrayOutput): void { assert(!this.#invalid && this.#availableUpdate.isEmpty(), "Cannot flush while update is available") const requiredCapacity = this.#computeCapacity() @@ -191,10 +196,12 @@ export class LiveStreamBroadcaster { output.writeInt(Flags.END) } + /** Computes the sum of capacities for all registered packages. */ #sumRequiredCapacity(): int { return this.#packages.reduce((sum, pack) => sum + pack.capacity, 0) } + /** Registers a package and returns a handle to remove it again. */ #storeChunk(pack: Package): Terminable { this.#packages.push(pack) this.#invalidate() @@ -206,11 +213,13 @@ export class LiveStreamBroadcaster { } } + /** Marks the structure as outdated. */ #invalidate(): void { this.#capacity = -1 this.#invalid = true } + /** Builds a structure description listing all packages and their types. */ #compileStructure(): ArrayBufferLike { const output = ByteArrayOutput.create() output.writeInt(Flags.ID) diff --git a/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts b/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts index 0a5ec0111..c844697d3 100644 --- a/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts +++ b/packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts @@ -21,6 +21,11 @@ import {Protocol} from "./Protocol" import {Lock} from "./Lock" import {Flags} from "./Flags" +/** + * Consumes live stream updates produced by a {@link LiveStreamBroadcaster}. + * The receiver reacts to structural changes, manages subscriptions and + * dispatches incoming values to listeners on every animation frame. + */ interface Package extends Terminable { dispatch(address: Address, input: ByteArrayInput): void subscribe(address: Address, procedure: Procedure): Subscription @@ -145,8 +150,9 @@ export class LiveStreamReceiver implements Terminable { } /** - * Connects the receiver to a messenger channel and starts processing - * incoming updates. Returns a {@link Terminable} to disconnect. + * Connects the receiver to a messenger channel and starts dispatching + * frames. The returned terminable tears down the connection and stops + * processing when invoked. */ connect(messenger: Messenger): Terminable { assert(!this.#connected, "Already connected") @@ -203,6 +209,10 @@ export class LiveStreamReceiver implements Terminable { /** Disconnects from the messenger and clears subscriptions. */ terminate(): void {this.#disconnect()} + /** + * Called for every animation frame; checks the shared lock and dispatches + * the latest data package to all registered handlers when available. + */ #dispatch(): void { if (this.#optLock.isEmpty() || this.#memory.isEmpty()) {return} const lock = this.#optLock.unwrap() @@ -214,11 +224,16 @@ export class LiveStreamReceiver implements Terminable { } } + /** Rebuilds dispatch procedures when the stream structure changes. */ #updateStructure(input: ByteArrayInput): void { Arrays.clear(this.#procedures) this.#parseStructure(input) } + /** + * Reads a data block and executes the stored procedures in order. Outdated + * versions are skipped until a matching structure arrives. + */ #dispatchData(input: ByteArrayInput): boolean { const version = input.readInt() if (version !== this.#structureVersion) { @@ -231,6 +246,10 @@ export class LiveStreamReceiver implements Terminable { return true } + /** + * Parses a structure update and prepares dispatch procedures for each + * registered address/type pair. + */ #parseStructure(input: ByteArrayInput): void { if (input.readInt() !== Flags.ID) {throw new Error("no valid id")} const version = input.readInt() diff --git a/packages/lib/fusion/src/live-stream/Protocol.ts b/packages/lib/fusion/src/live-stream/Protocol.ts index b43072fc0..8ae0b6d23 100644 --- a/packages/lib/fusion/src/live-stream/Protocol.ts +++ b/packages/lib/fusion/src/live-stream/Protocol.ts @@ -1,12 +1,18 @@ /** - * Messaging contract used between the broadcaster and receiver. All methods - * forward updates over a shared channel without awaiting a response. + * Defines the messaging contract between a {@link LiveStreamBroadcaster} + * and a {@link LiveStreamReceiver}. Implementations forward shared memory + * references and update packets across thread boundaries. */ export interface Protocol { - /** Shares the lock that synchronises read/write access to data buffers. */ + /** Share the lock used to coordinate access to the data buffer. */ sendShareLock(lock: SharedArrayBuffer): void - /** Sends the data buffer containing audio frames. */ + + /** Transfer a new data buffer containing the actual stream values. */ sendUpdateData(data: ArrayBufferLike): void - /** Sends structural information about the current stream layout. */ + + /** + * Send an updated structure description describing the layout of the + * forthcoming data packages. + */ sendUpdateStructure(structure: ArrayBufferLike): void } \ No newline at end of file