Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/docs/docs-dev/architecture/ring-buffers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Ring Buffers

openDAW transfers audio between threads using shared-memory ring buffers. The
[`RingBuffer` helper](../../studio/adapters/src/RingBuffer.ts) manages a
`SharedArrayBuffer` that stores planar chunks of audio data and exposes
lightweight reader and writer utilities. Worklets such as the
[`RecordingWorklet`](../../studio/core/src/Worklets.ts) use this mechanism to
move audio from the real-time engine to the main thread without costly
structured cloning.

Each ring buffer reserves a header with atomic pointers followed by a contiguous
block of channel data. Writers advance the write pointer after copying a block
and notify readers via `Atomics.notify`, while readers block or poll until new
chunks are available. This design minimises latency and garbage creation when
streaming audio between the engine, worklets and UI.
11 changes: 11 additions & 0 deletions packages/docs/docs-user/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Performance Guide

openDAW is designed to remain responsive even in large sessions. Close other
applications and browser tabs if you encounter glitches. Increasing the audio
buffer size in settings can give the CPU more time to process complex
arrangements.

The engine streams audio between threads using shared-memory ring buffers. This
avoids expensive structured cloning and keeps latency low. For a deep dive into
the design, see the [ring buffer architecture
notes](../docs-dev/architecture/ring-buffers.md).
21 changes: 21 additions & 0 deletions packages/lib/runtime/src/communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ export namespace Communicator {
};
}

/**
* Executes protocol functions received from a remote {@link Sender}.
* Instances are created via {@link Communicator.executor}.
*/
export class Executor<PROTOCOL> implements Terminable {
readonly #messenger: Messenger;
readonly #protocol: PROTOCOL;
Expand Down Expand Up @@ -256,36 +260,53 @@ export namespace Communicator {
this.#messenger.send({ type: "callback", returnId, funcAt: func, args });
}

/**
* Message describing a function call.
*/
type Send<T> = {
/** Discriminator indicating a call request. */
type: "send";
/** Name of the function to invoke on the executor. */
func: keyof T;
/** Serialised arguments for the function. */
args: Arg[];
/** Identifier used to correlate promises or `false` for fire-and-forget. */
returnId: int | false;
};

/** Message sent to execute a callback previously passed as an argument. */
type Callback = {
type: "callback";
/** Index of the original callback argument. */
funcAt: int;
/** Arguments forwarded to the callback. */
args: Arg[];
/** Identifier linking back to the originating call. */
returnId: int;
};

/** Response resolving a pending call. */
type Resolve = {
type: "resolve";
/** Value returned by the remote execution. */
resolve: any;
returnId: int;
};

/** Response rejecting a pending call. */
type Reject = {
type: "reject";
/** Reason supplied by the remote execution. */
reject: any;
returnId: int;
};

/** Internal bookkeeping for outstanding calls. */
type Return = {
executorTuple: ExecutorTuple<any>;
callbacks?: Map<int, Function>;
};

/** Either a plain value or a placeholder for a callback argument. */
type Arg = { value: any } | { callback: int };
}
6 changes: 6 additions & 0 deletions packages/lib/runtime/src/messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ export type Messenger = Observable<any> & Terminable & {
channel(name: string): Messenger
}

/**
* Default {@link Messenger} implementation forwarding messages to a {@link Port}.
*/
class NativeMessenger implements Messenger {
readonly #port: Port
readonly #notifier = new Notifier<any>()
Expand Down Expand Up @@ -107,6 +110,9 @@ class NativeMessenger implements Messenger {
}
}

/**
* Logical sub-channel that filters messages by name.
*/
class Channel implements Messenger {
readonly #messages: Messenger
readonly #name: string
Expand Down
23 changes: 23 additions & 0 deletions packages/lib/std/src/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,31 @@ export namespace SyncStream {
WRITTEN,
}

/**
* Exposes a method that attempts to write structured data into a shared buffer.
*/
export interface Writer {
/** Attempts to serialise a value into the buffer. */
readonly tryWrite: Provider<boolean>;
}

/**
* Represents the reading side of a synchronous stream.
*/
export interface Reader {
/** Shared buffer populated by a corresponding {@link Writer}. */
readonly buffer: SharedArrayBuffer;
/** Attempts to read and deserialize the next value. */
readonly tryRead: Provider<boolean>;
}

/**
* Creates a {@link Writer} that serialises objects of type `T` into the provided buffer.
*
* @param io - Schema used to encode objects.
* @param buffer - Shared memory backing the stream.
* @param populate - Function that fills the schema's object prior to writing.
*/
export const writer = <T extends object>(
io: Schema.IO<T>,
buffer: SharedArrayBuffer,
Expand Down Expand Up @@ -50,6 +66,13 @@ export namespace SyncStream {
};
};

/**
* Creates a {@link Reader} that deserialises objects of type `T` from a shared buffer.
*
* @param io - Schema used to decode objects.
* @param procedure - Callback receiving each decoded instance.
* @returns Reader exposing the shared buffer and a polling function.
*/
export const reader = <T extends object>(
io: Schema.IO<T>,
procedure: Procedure<T>,
Expand Down
28 changes: 24 additions & 4 deletions packages/studio/adapters/src/RingBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ declare let document: any
* {@link @opendaw/studio-core-processors#RecordingProcessor | RecordingProcessor}.
*/
export namespace RingBuffer {
/** Configuration for a ring buffer. */
/**
* Parameters describing the shared ring buffer layout.
*/
export interface Config {
/** Shared memory backing the ring buffer. */
sab: SharedArrayBuffer
/** Maximum number of chunks stored at any time. */
numChunks: int
/** Number of audio channels per chunk. */
numberOfChannels: int
/** Number of samples in each channel of a chunk. */
bufferSize: int
}

Expand All @@ -24,6 +30,10 @@ export namespace RingBuffer {

/**
* Creates a reader that appends each chunk to the provided callback.
*
* @param config - Shared buffer configuration.
* @param append - Callback receiving decoded channel data for each chunk.
* @returns Reader that can be stopped to cease iteration.
*/
export const reader = ({
sab,
Expand Down Expand Up @@ -71,6 +81,9 @@ export namespace RingBuffer {

/**
* Creates a writer for the given configuration.
*
* @param config - Layout of the underlying shared buffer.
* @returns Object exposing a {@link Writer.write | write} method.
*/
export const writer = ({sab, numChunks, numberOfChannels, bufferSize}: Config): Writer => {
const pointers = new Int32Array(sab, 0, 2)
Expand All @@ -97,10 +110,17 @@ export namespace RingBuffer {
}
/**
* Flattens an array of planar chunks into contiguous channel buffers.
*
* @param chunks - Recorded audio split into sequential chunks.
* @param bufferSize - Number of frames in each chunk.
* @param maxFrames - Optional cap for the total number of frames copied.
* @returns Array of channels with concatenated audio data.
*/
export const mergeChunkPlanes = (chunks: ReadonlyArray<ReadonlyArray<Float32Array>>,
bufferSize: int,
maxFrames: int = Number.MAX_SAFE_INTEGER): ReadonlyArray<Float32Array> => {
export const mergeChunkPlanes = (
chunks: ReadonlyArray<ReadonlyArray<Float32Array>>,
bufferSize: int,
maxFrames: int = Number.MAX_SAFE_INTEGER,
): ReadonlyArray<Float32Array> => {
if (chunks.length === 0) {return Arrays.empty()}
const numChannels = chunks[0].length
const numFrames = Math.min(bufferSize * chunks.length, maxFrames)
Expand Down
13 changes: 12 additions & 1 deletion packages/studio/core-processors/src/AudioUnit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,31 @@ export class AudioUnit implements Terminable {

/**
* Returns the currently connected input processor if any.
*
* @returns Optional instrument or bus feeding this unit.
*/
input(): Option<InstrumentDeviceProcessor | AudioBusProcessor> {return this.#input}
/**
* Convenience wrapper that asserts the input is an {@link AudioBusProcessor}.
*
* @throws Error if the input is missing or not an {@link AudioBusProcessor}.
*/
inputAsAudioBus(): AudioBusProcessor {return asInstanceOf(this.#input.unwrap("No input available"), AudioBusProcessor)}
/** Access to the channel strip's audio output. */
/**
* Access to the channel strip's audio output.
*
* @returns Buffer representing the processed output of the unit.
*/
audioOutput(): AudioBuffer {return this.#audioDeviceChain.channelStrip.audioOutput}

get midiDeviceChain(): MidiDeviceChain {return this.#midiDeviceChain}
get audioDeviceChain(): AudioDeviceChain {return this.#audioDeviceChain}
get context(): EngineContext {return this.#context}
get adapter(): AudioUnitBoxAdapter {return this.#adapter}

/**
* Releases all resources and disconnects processors.
*/
terminate(): void {
console.debug(`terminate ${this}`)
this.#terminator.terminate()
Expand Down
20 changes: 17 additions & 3 deletions packages/studio/core-processors/src/EventBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,25 @@ export class EventBuffer {

constructor() {this.#events = new ArrayMultimap(Arrays.empty(), Event.Comparator)}

/** Adds an event scheduled for the given block. */
/**
* Adds an event scheduled for the given block.
*
* @param index - Render block index for which the event should fire.
* @param event - Event instance to store.
*/
add(index: int, event: Event): void {this.#events.add(index, event)}
/** Returns the events for the block or an empty array. */
/**
* Returns the events for the block or an empty array.
*
* @param index - Block index to retrieve events for.
* @returns All events scheduled for that block.
*/
get(index: int): ReadonlyArray<Event> {return this.#events.get(index)}
/** Iterates over all stored block/event pairs. */
/**
* Iterates over all stored block/event pairs.
*
* @param procedure - Callback invoked with the block index and its events.
*/
forEach(procedure: (index: int, values: ReadonlyArray<Event>) => void): void {return this.#events.forEach(procedure)}
/** Clears all stored events. */
clear(): void {this.#events.clear()}
Expand Down
54 changes: 29 additions & 25 deletions packages/studio/core-processors/src/UpdateClock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,37 @@ export class UpdateClock extends AbstractProcessor {
this.own(this.context.registerProcessor(this));
}

/** Clears all pending events from the input buffer. */
reset(): void {
this.eventInput.clear();
}
/** Clears pending input events. */
reset(): void {this.eventInput.clear()}

/** Adds another event buffer that should receive update events. */
addEventOutput(output: EventBuffer): Terminable {
this.#outputs.push(output);
return { terminate: () => Arrays.remove(this.#outputs, output) };
}
/**
* Adds another event buffer that should receive update events.
*
* @param output - Buffer to append generated {@link UpdateEvent}s to.
* @returns Terminable that removes the buffer when invoked.
*/
addEventOutput(output: EventBuffer): Terminable {
this.#outputs.push(output)
return {terminate: () => Arrays.remove(this.#outputs, output)}
}

/**
* Walks the render blocks and emits {@link UpdateEvent}s at the configured
* {@link UpdateClockRate}. Only blocks flagged as transporting will trigger
* updates.
*
* @param blocks - Render blocks describing the current processing slice.
*/
process({blocks}: ProcessInfo): void {
blocks.forEach(({p0, p1, flags}, index: int) => {
if (!Bits.every(flags, BlockFlag.transporting)) {return}
for (const position of Fragmentor.iterate(p0, p1, UpdateClockRate)) {
const event: UpdateEvent = {type: "update-event", position}
this.#outputs.forEach(output => output.add(index, event))
}
})
}

/**
* Walks the render blocks and emits {@link UpdateEvent}s at the configured
* {@link UpdateClockRate}. Only blocks flagged as transporting will trigger
* updates.
*/
process({ blocks }: ProcessInfo): void {
blocks.forEach(({ p0, p1, flags }, index: int) => {
if (!Bits.every(flags, BlockFlag.transporting)) {
return;
}
for (const position of Fragmentor.iterate(p0, p1, UpdateClockRate)) {
const event: UpdateEvent = { type: "update-event", position };
this.#outputs.forEach((output) => output.add(index, event));
}
});
}

/** Diagnostic representation for debugging. */
toString(): string {
Expand Down
Loading