Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/app/studio/src/service/SessionService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/**
* Service handling persistence and lifecycle operations for project sessions.
* Works in concert with {@link SyncLogService} for commit logging and emits
* {@link StudioSignal} events to update the UI.
*/
import {ProjectSession} from "@/project/ProjectSession"
import {
Expand Down
2 changes: 2 additions & 0 deletions packages/app/studio/src/service/StudioSignal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {Sample} from "@opendaw/studio-adapters"

/**
* Events broadcast by {@link StudioService} to update various UI elements.
* These complement the {@link SessionService} lifecycle and other session
* oriented services.
*
* ```mermaid
* classDiagram
Expand Down
4 changes: 3 additions & 1 deletion packages/app/studio/src/service/SyncLogService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import {StudioService} from "@/service/StudioService"
import {Commit, SyncLogReader, SyncLogWriter} from "@opendaw/studio-core"

/**
* Service helpers for writing and appending project SyncLog files.
* Service helpers for writing and appending project SyncLog files. Intended to
* be used alongside the {@link SessionService} to persist collaborative
* session history.
*
* ```mermaid
* flowchart LR
Expand Down
5 changes: 3 additions & 2 deletions packages/docs/docs-dev/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ For a deeper look at timing, see the [audio path](./audio-path.md) and the
- **Config** – Delivers runtime and build settings consumed by other components.

Communication between these parts is based on lightweight message channels; see
the [messaging architecture](./messaging.md) for details. Concrete worker
protocols are described in the [Fusion docs](../fusion/overview.md).
the [messaging architecture](./messaging.md) for details. For real-time
collaboration features, explore the [Live Stream service](../services/live-stream.md)
and the user-facing [collaboration guide](../../docs-user/features/collaboration.md).

## Worker Lifecycle

Expand Down
10 changes: 10 additions & 0 deletions packages/docs/docs-dev/services/live-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Live Stream Service

Streams real-time data between workers and the UI using shared memory buffers.

```mermaid
flowchart LR
broadcaster -->|SharedArrayBuffer| receiver
```

See the user-facing [collaboration feature](../../docs-user/features/collaboration.md) for how this service is used.
13 changes: 13 additions & 0 deletions packages/docs/docs-user/features/collaboration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Collaboration

Share project changes in real time with other users.

## Start a session
1. Open the collaboration menu.
2. Invite participants and begin sharing.

## Leave a session
1. Open the collaboration menu.
2. Choose **Disconnect** to stop sharing updates.

Developers can learn about the underlying service in the [Live Stream documentation](../../docs-dev/services/live-stream.md).
31 changes: 20 additions & 11 deletions packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ interface Package {
}

/**
* Broadcasts typed data packages over a {@link Messenger} channel. Packages
* are written into a shared buffer and synchronised with connected
* {@link LiveStreamReceiver} instances using a simple lock.
* Serialises values provided by various publishers into a shared buffer and
* forwards updates to a {@link LiveStreamReceiver}. The broadcaster owns the
* data buffer and controls when receivers may read from it by toggling a
* shared {@link Lock}.
*/
export class LiveStreamBroadcaster {
/**
Expand Down Expand Up @@ -61,8 +62,10 @@ export class LiveStreamBroadcaster {
}

/**
* Flushes any pending structural updates and, when permitted by the shared
* lock, writes the latest package data to the output buffer.
* Flushes the pending packages into the shared buffer. When the structure
* changes, the receiver is first notified with a new layout description and
* data buffer. Actual values are written only when the consumer signals that
* it is ready by setting the {@link Lock} to {@link Lock.WRITE}.
*/
flush(): void {
const update = this.#updateAvailable()
Expand All @@ -71,9 +74,9 @@ export class LiveStreamBroadcaster {
this.#sender.sendShareLock(this.#lock)
this.#lockShared = true
}
this.#sender.sendUpdateStructure(update.unwrap())
const capacity = this.#computeCapacity()
if (this.#output.remaining < capacity) {
this.#sender.sendUpdateStructure(update.unwrap())
const capacity = this.#computeCapacity()
if (this.#output.remaining < capacity) {
const size = nextPowOf2(capacity)
const data = new SharedArrayBuffer(size)
this.#output = ByteArrayOutput.use(data)
Expand All @@ -82,8 +85,9 @@ export class LiveStreamBroadcaster {
}
}
if (this.#sabOption.isEmpty()) {return}
// If main-thread is not interested, no data will ever be sent again, since it will not set the lock to CAN_WRITE.
// No lock is necessary since the other side skips reading until we set the lock to CAN_READ.
// If main-thread is not interested, no data will ever be sent again,
// since it will not set the lock to CAN_WRITE. No lock is necessary since
// the other side skips reading until we set the lock to CAN_READ.
if (Atomics.load(this.#lockArray, 0) === Lock.WRITE) {
this.#flushData(this.#output)
this.#output.position = 0
Expand Down Expand Up @@ -173,14 +177,15 @@ export class LiveStreamBroadcaster {
return this.#capacity
}

/** Clears any registered packages and releases internal resources. */
/** Clears all stored packages and resets state. */
terminate(): void {
Arrays.clear(this.#packages)
this.#availableUpdate = Option.None
this.#invalid = false
this.#capacity = 0
}

/** Writes the version and all package payloads into the buffer. */
#flushData(output: ByteArrayOutput): void {
assert(!this.#invalid && this.#availableUpdate.isEmpty(), "Cannot flush while update is available")
const requiredCapacity = this.#computeCapacity()
Expand All @@ -191,10 +196,12 @@ export class LiveStreamBroadcaster {
output.writeInt(Flags.END)
}

/** Computes the sum of capacities for all registered packages. */
#sumRequiredCapacity(): int {
return this.#packages.reduce((sum, pack) => sum + pack.capacity, 0)
}

/** Registers a package and returns a handle to remove it again. */
#storeChunk(pack: Package): Terminable {
this.#packages.push(pack)
this.#invalidate()
Expand All @@ -206,11 +213,13 @@ export class LiveStreamBroadcaster {
}
}

/** Marks the structure as outdated. */
#invalidate(): void {
this.#capacity = -1
this.#invalid = true
}

/** Builds a structure description listing all packages and their types. */
#compileStructure(): ArrayBufferLike {
const output = ByteArrayOutput.create()
output.writeInt(Flags.ID)
Expand Down
23 changes: 21 additions & 2 deletions packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import {Protocol} from "./Protocol"
import {Lock} from "./Lock"
import {Flags} from "./Flags"

/**
* Consumes live stream updates produced by a {@link LiveStreamBroadcaster}.
* The receiver reacts to structural changes, manages subscriptions and
* dispatches incoming values to listeners on every animation frame.
*/
interface Package<T> extends Terminable {
dispatch(address: Address, input: ByteArrayInput): void
subscribe(address: Address, procedure: Procedure<T>): Subscription
Expand Down Expand Up @@ -145,8 +150,9 @@ export class LiveStreamReceiver implements Terminable {
}

/**
* Connects the receiver to a messenger channel and starts processing
* incoming updates. Returns a {@link Terminable} to disconnect.
* Connects the receiver to a messenger channel and starts dispatching
* frames. The returned terminable tears down the connection and stops
* processing when invoked.
*/
connect(messenger: Messenger): Terminable {
assert(!this.#connected, "Already connected")
Expand Down Expand Up @@ -203,6 +209,10 @@ export class LiveStreamReceiver implements Terminable {
/** Disconnects from the messenger and clears subscriptions. */
terminate(): void {this.#disconnect()}

/**
* Called for every animation frame; checks the shared lock and dispatches
* the latest data package to all registered handlers when available.
*/
#dispatch(): void {
if (this.#optLock.isEmpty() || this.#memory.isEmpty()) {return}
const lock = this.#optLock.unwrap()
Expand All @@ -214,11 +224,16 @@ export class LiveStreamReceiver implements Terminable {
}
}

/** Rebuilds dispatch procedures when the stream structure changes. */
#updateStructure(input: ByteArrayInput): void {
Arrays.clear(this.#procedures)
this.#parseStructure(input)
}

/**
* Reads a data block and executes the stored procedures in order. Outdated
* versions are skipped until a matching structure arrives.
*/
#dispatchData(input: ByteArrayInput): boolean {
const version = input.readInt()
if (version !== this.#structureVersion) {
Expand All @@ -231,6 +246,10 @@ export class LiveStreamReceiver implements Terminable {
return true
}

/**
* Parses a structure update and prepares dispatch procedures for each
* registered address/type pair.
*/
#parseStructure(input: ByteArrayInput): void {
if (input.readInt() !== Flags.ID) {throw new Error("no valid id")}
const version = input.readInt()
Expand Down
16 changes: 11 additions & 5 deletions packages/lib/fusion/src/live-stream/Protocol.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
/**
* Messaging contract used between the broadcaster and receiver. All methods
* forward updates over a shared channel without awaiting a response.
* Defines the messaging contract between a {@link LiveStreamBroadcaster}
* and a {@link LiveStreamReceiver}. Implementations forward shared memory
* references and update packets across thread boundaries.
*/
export interface Protocol {
/** Shares the lock that synchronises read/write access to data buffers. */
/** Share the lock used to coordinate access to the data buffer. */
sendShareLock(lock: SharedArrayBuffer): void
/** Sends the data buffer containing audio frames. */

/** Transfer a new data buffer containing the actual stream values. */
sendUpdateData(data: ArrayBufferLike): void
/** Sends structural information about the current stream layout. */

/**
* Send an updated structure description describing the layout of the
* forthcoming data packages.
*/
sendUpdateStructure(structure: ArrayBufferLike): void
}