Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/docs/docs-dev/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ learn how to build the project in [Build and Run](../build-and-run/setup.md).
- **Config** – Delivers runtime and build settings consumed by other components.

Communication between these parts is based on lightweight message channels; see
the [messaging architecture](./messaging.md) for details.
the [messaging architecture](./messaging.md) for details. Concrete worker
protocols are described in the [Fusion docs](../fusion/overview.md).

## Worker Lifecycle

Expand Down
3 changes: 3 additions & 0 deletions packages/docs/docs-dev/fusion/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ const peaks = await worker.generate(buffer)
const opfs = new OpfsWorker()
await opfs.writeFile('demo.wav', data)
```

More details are available in the [OPFS](opfs.md),
[Live Stream](live-stream.md) and [Peaks](peaks.md) guides.
5 changes: 5 additions & 0 deletions packages/docs/docs-dev/fusion/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@

**A:** Any browser with Web Worker and OPFS support. Chrome-based browsers
currently provide the most complete feature set.

**Q:** Where can I learn more about the protocols?

**A:** See the [Fusion overview](overview.md) and related pages for details on
OPFS, live streaming and peak generation.
13 changes: 13 additions & 0 deletions packages/docs/docs-dev/fusion/live-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Live Stream

`LiveStreamBroadcaster` and `LiveStreamReceiver` exchange typed packages over a
message channel. Data is written into a shared buffer and guarded by a
single-byte lock.

```ts
const broadcaster = LiveStreamBroadcaster.create(messenger, 'audio')
broadcaster.broadcastFloats(address, buffer, update)

const receiver = new LiveStreamReceiver()
receiver.connect(messenger)
```
13 changes: 13 additions & 0 deletions packages/docs/docs-dev/fusion/opfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# OPFS Worker

The `OpfsWorker` provides access to the Origin Private File System from a web
worker. It implements the [`OpfsProtocol`](../../../lib/fusion/src/opfs/OpfsProtocol.ts)
so the main thread can read, write and enumerate files without blocking.

```ts
// Initialize the worker
OpfsWorker.init(messenger)

// Write data
await protocol.write('demo.txt', data)
```
4 changes: 4 additions & 0 deletions packages/docs/docs-dev/fusion/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ The Fusion utilities provide structured messaging channels for audio and file
operations within openDAW. Live streaming, waveform peak generation and Origin
Private File System (OPFS) access are exposed through dedicated workers and
protocols.

- [OPFS worker](opfs.md)
- [Live streaming](live-stream.md)
- [Waveform peaks](peaks.md)
13 changes: 13 additions & 0 deletions packages/docs/docs-dev/fusion/peaks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Peaks

The peaks utilities generate multi-resolution waveform overviews.

* `SamplePeakWorker` computes peak data from raw audio frames.
* `Peaks` structures the data for quick lookups.
* `PeaksPainter` renders the peaks onto a canvas.

```ts
const worker = SamplePeakWorker.install(messenger)
const buffer = await protocol.generateAsync(progress, shifts, frames, numFrames, numChannels)
const peaks = SamplePeaks.from(new ByteArrayInput(buffer))
```
2 changes: 2 additions & 0 deletions packages/docs/docs-user/workflows/exporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ flowchart TD
6. **Download or share.** Save the resulting file to your computer or send the bundle to a collaborator.

Use this workflow whenever you need to back up a project or deliver a finished track. For collaboration tips see the [Collaboration workflow](collaboration.md).
Developers can learn how exported files are written by the OPFS worker in the
[Fusion OPFS docs](../../docs-dev/fusion/opfs.md).
3 changes: 3 additions & 0 deletions packages/lib/fusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Web-based audio workstation fusion utilities for TypeScript projects.

See the [API documentation](https://opendaw.org/docs/api/fusion/) for detailed reference.

For developer guides covering worker protocols and usage patterns see the
[Fusion docs](../docs/docs-dev/fusion/overview.md).

## File System Operations

* **OpfsWorker.ts** - Origin Private File System worker implementation
Expand Down
4 changes: 4 additions & 0 deletions packages/lib/fusion/src/live-stream/Flags.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Markers used in the live-stream protocol to delimit structure and data
* sections.
*/
export const enum Flags {
ID = 0xF0FF0F, START = 0xF0F0F0, END = 0x0F0F0F
}
18 changes: 18 additions & 0 deletions packages/lib/fusion/src/live-stream/LiveStreamBroadcaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ interface Package {
put(output: ByteArrayOutput): void
}

/**
* Broadcasts typed data packages over a {@link Messenger} channel. Packages
* are written into a shared buffer and synchronised with connected
* {@link LiveStreamReceiver} instances using a simple lock.
*/
export class LiveStreamBroadcaster {
/**
* Creates a broadcaster backed by a named messenger channel.
*/
static create(messenger: Messenger, name: string): LiveStreamBroadcaster {
return new LiveStreamBroadcaster(messenger.channel(name))
}
Expand Down Expand Up @@ -52,6 +60,10 @@ export class LiveStreamBroadcaster {
})
}

/**
* Flushes any pending structural updates and, when permitted by the shared
* lock, writes the latest package data to the output buffer.
*/
flush(): void {
const update = this.#updateAvailable()
if (update.nonEmpty()) {
Expand Down Expand Up @@ -79,6 +91,7 @@ export class LiveStreamBroadcaster {
}
}

/** Registers a float provider to be broadcast on each flush. */
broadcastFloat(address: Address, provider: Provider<float>): Terminable {
return this.#storeChunk(new class implements Package {
readonly type: PackageType = PackageType.Float
Expand All @@ -88,6 +101,7 @@ export class LiveStreamBroadcaster {
})
}

/** Registers an integer provider to be broadcast on each flush. */
broadcastInteger(address: Address, provider: Provider<int>): Terminable {
return this.#storeChunk(new class implements Package {
readonly type: PackageType = PackageType.Integer
Expand All @@ -97,6 +111,7 @@ export class LiveStreamBroadcaster {
})
}

/** Broadcasts a `Float32Array` after invoking the supplied update hook. */
broadcastFloats(address: Address, values: Float32Array, update: Exec): Terminable {
return this.#storeChunk(new class implements Package {
readonly type: PackageType = PackageType.FloatArray
Expand All @@ -110,6 +125,7 @@ export class LiveStreamBroadcaster {
})
}

/** Broadcasts an `Int32Array` after invoking the supplied update hook. */
broadcastIntegers(address: Address, values: Int32Array, update: Exec): Terminable {
return this.#storeChunk(new class implements Package {
readonly type: PackageType = PackageType.IntegerArray
Expand All @@ -123,6 +139,7 @@ export class LiveStreamBroadcaster {
})
}

/** Broadcasts a raw byte array after invoking the supplied update hook. */
broadcastByteArray(address: Address, values: Int8Array, update: Exec): Terminable {
return this.#storeChunk(new class implements Package {
readonly type: PackageType = PackageType.ByteArray
Expand Down Expand Up @@ -156,6 +173,7 @@ export class LiveStreamBroadcaster {
return this.#capacity
}

/** Clears any registered packages and releases internal resources. */
terminate(): void {
Arrays.clear(this.#packages)
this.#availableUpdate = Option.None
Expand Down
14 changes: 14 additions & 0 deletions packages/lib/fusion/src/live-stream/LiveStreamReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class ByteArrayPackage extends ArrayPackage<Int8Array> {
}
}

/**
* Receives packages produced by a {@link LiveStreamBroadcaster}. Subscribers
* can register for specific addresses to be notified when new data arrives.
*/
export class LiveStreamReceiver implements Terminable {
static ID: int = 0 | 0

Expand Down Expand Up @@ -140,6 +144,10 @@ export class LiveStreamReceiver implements Terminable {
this.#packages[PackageType.ByteArray] = this.#bytes
}

/**
* Connects the receiver to a messenger channel and starts processing
* incoming updates. Returns a {@link Terminable} to disconnect.
*/
connect(messenger: Messenger): Terminable {
assert(!this.#connected, "Already connected")
this.#connected = true
Expand Down Expand Up @@ -167,26 +175,32 @@ export class LiveStreamReceiver implements Terminable {
this.#bytes.terminate()
}

/** Subscribes to a single float value at the given address. */
subscribeFloat(address: Address, procedure: Procedure<int>): Subscription {
return this.#float.subscribe(address, procedure)
}

/** Subscribes to a single integer value at the given address. */
subscribeInteger(address: Address, procedure: Procedure<float>): Subscription {
return this.#integer.subscribe(address, procedure)
}

/** Subscribes to a `Float32Array` at the given address. */
subscribeFloats(address: Address, procedure: Procedure<Float32Array>): Subscription {
return this.#floats.subscribe(address, procedure)
}

/** Subscribes to an `Int32Array` at the given address. */
subscribeIntegers(address: Address, procedure: Procedure<Int32Array>): Subscription {
return this.#integers.subscribe(address, procedure)
}

/** Subscribes to a byte array at the given address. */
subscribeByteArray(address: Address, procedure: Procedure<Int8Array>): Subscription {
return this.#bytes.subscribe(address, procedure)
}

/** Disconnects from the messenger and clears subscriptions. */
terminate(): void {this.#disconnect()}

#dispatch(): void {
Expand Down
5 changes: 5 additions & 0 deletions packages/lib/fusion/src/live-stream/Lock.ts
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
/**
* Synchronisation states for the shared buffer used by broadcaster and
* receiver. `WRITE` indicates the broadcaster may write new data, `READ`
* signals that receivers can consume it.
*/
export enum Lock {WRITE = 0, READ = 1}
3 changes: 3 additions & 0 deletions packages/lib/fusion/src/live-stream/PackageType.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
/**
* Identifies the kind of data contained in a live-stream package.
*/
export enum PackageType {Float, FloatArray, Integer, IntegerArray, ByteArray}
7 changes: 7 additions & 0 deletions packages/lib/fusion/src/live-stream/Protocol.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
/**
* Messaging contract used between the broadcaster and receiver. All methods
* forward updates over a shared channel without awaiting a response.
*/
export interface Protocol {
/** Shares the lock that synchronises read/write access to data buffers. */
sendShareLock(lock: SharedArrayBuffer): void
/** Sends the data buffer containing audio frames. */
sendUpdateData(data: ArrayBufferLike): void
/** Sends structural information about the current stream layout. */
sendUpdateStructure(structure: ArrayBufferLike): void
}
12 changes: 12 additions & 0 deletions packages/lib/fusion/src/live-stream/Subscribers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@ import {Address} from "@opendaw/lib-box"

type ListenersEntry<T> = { address: Address, listeners: Array<T> }

/**
* Maintains a set of listeners grouped by address. Each address can have
* multiple subscribers which are automatically removed once their
* {@link Subscription} is terminated.
*/
export class Subscribers<T> implements Terminable {
readonly #subscribers: SortedSet<Address, ListenersEntry<T>>

constructor() {this.#subscribers = Address.newSet<ListenersEntry<T>>(entry => entry.address)}

/** Returns the listeners registered for the given address, if any. */
getOrNull(address: Address): Nullish<ReadonlyArray<T>> {return this.#subscribers.getOrNull(address)?.listeners}

/** Checks whether the address currently has no subscribers. */
isEmpty(address: Address): boolean {return !this.#subscribers.hasKey(address) }

/**
* Adds a listener for the specified address and returns a subscription
* handle that removes it when terminated.
*/
subscribe(address: Address, listener: T): Subscription {
const entry = this.#subscribers.getOrNull(address)
if (isDefined(entry)) {
Expand All @@ -31,5 +42,6 @@ export class Subscribers<T> implements Terminable {
}
}

/** Removes all listeners for all addresses. */
terminate(): void {this.#subscribers.clear()}
}
12 changes: 12 additions & 0 deletions packages/lib/fusion/src/opfs/OpfsProtocol.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
/** Describes an OPFS entry type. */
export type Kind = "file" | "directory"

/** A single file system entry returned by {@link OpfsProtocol.list}. */
export type Entry = { name: string, kind: Kind }

/**
* Contract describing operations supported by the OPFS worker.
* Implementations perform synchronous file system tasks and return results
* via promises to the main thread.
*/
export interface OpfsProtocol {
/** Writes data to the given file path. */
write(path: string, data: Uint8Array): Promise<void>
/** Reads the contents of the specified file path. */
read(path: string): Promise<Uint8Array>
/** Removes a file or directory at the supplied path. */
delete(path: string): Promise<void>
/** Lists entries within the directory at the supplied path. */
list(path: string): Promise<ReadonlyArray<Entry>>
}
15 changes: 15 additions & 0 deletions packages/lib/fusion/src/opfs/OpfsWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ import {Communicator, Messenger, Promises} from "@opendaw/lib-runtime"
import {Entry, OpfsProtocol} from "./OpfsProtocol"
import "../types"

/**
* Implements Origin Private File System (OPFS) operations inside a worker.
*
* The {@link init} function wires the worker to a {@link Messenger} channel
* and exposes the {@link OpfsProtocol} so the main thread can perform file
* system tasks without direct access to privileged APIs.
*/
export namespace OpfsWorker {
const DEBUG = false
const readLimiter = new Promises.Limit<Uint8Array>(1)
const writeLimiter = new Promises.Limit<void>(1)

/**
* Registers the worker on the given messenger and starts listening for
* {@link OpfsProtocol} commands.
*/
export const init = (messenger: Messenger) =>
Communicator.executor(messenger.channel("opfs"), new class implements OpfsProtocol {
async write(path: string, data: Uint8Array): Promise<void> {
Expand Down Expand Up @@ -65,6 +76,10 @@ export namespace OpfsWorker {
}
})

/**
* Splits a POSIX style path into individual segments while removing
* leading and trailing slashes.
*/
const pathToSegments = (path: string): ReadonlyArray<string> => {
const noSlashes = path.replace(/^\/+|\/+$/g, "")
return noSlashes === "" ? [] : noSlashes.split("/")
Expand Down
Loading