Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/aws-lambda/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ export interface AssembleEvent {
OutputS3Uri: string;
/** Output container format; drives file vs frame-dir handling. */
Format: DistributedFormat;
/**
* Optional exact-CFR re-encode at assemble time. When `true`, the final
* assembled video is re-encoded with `-fps_mode cfr -r <fps>` so the
* stream's `avg_frame_rate` matches the container's `r_frame_rate`
* exactly (and the file's duration is exact, not PTS-derived). Trade-off
* is ~2-5x the assemble wall-clock. mp4 only — webm / mov stream-copy
* paths already produce exact avg_frame_rate. Default `false` /
* unset preserves current `-c copy` behavior.
*/
Cfr?: boolean;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/aws-lambda/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ async function handleAssemble(
? join(work, "output-frames")
: join(work, `output${formatExtension(event.Format)}`);

const result: AssembleResult = await primitive(planDir, chunkPaths, audioPath, finalOutput);
const result: AssembleResult = await primitive(planDir, chunkPaths, audioPath, finalOutput, {
cfr: event.Cfr === true,
});

if (event.Format === "png-sequence") {
const tarball = `${finalOutput}.tar.gz`;
Expand Down
179 changes: 179 additions & 0 deletions packages/producer/src/services/distributed/assemble.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function buildPlanDir(
chunks: ChunkSliceJson[],
totalFrames: number,
hasAudio: boolean,
encoder: "libx264-software" | "libx265-software" = "libx264-software",
): string {
const planDir = mkdtempSync(join(runRoot, `plan-${format}-`));
mkdirSync(join(planDir, "meta"), { recursive: true });
Expand All @@ -60,6 +61,10 @@ function buildPlanDir(
"utf-8",
);
writeFileSync(join(planDir, "meta", "chunks.json"), JSON.stringify(chunks), "utf-8");
// Minimal encoder.json — assemble reads this when cfr=true to detect h265
// chunks (the cfr re-encode hardcodes libx264 and would silently transcode
// h265). Tests default to libx264 to match the in-production default.
writeFileSync(join(planDir, "meta", "encoder.json"), JSON.stringify({ encoder }), "utf-8");
return planDir;
}

Expand Down Expand Up @@ -321,6 +326,180 @@ describe("assemble()", () => {
TIMEOUT_MS,
);

it(
"cfr:true re-encodes for exact avg_frame_rate matching r_frame_rate",
async () => {
if (!hasFfmpeg) {
console.warn("[assemble.test] skipping cfr test — ffmpeg not available on this host");
return;
}

// Opt-in CFR: the re-encode pass with `-fps_mode cfr -r <fps>` must
// land the stream's `avg_frame_rate` on the requested rational
// exactly, not a PTS-derived fraction. Default `cfr=false` path is
// covered by the existing concat-copy tests above.
const chunks: ChunkSliceJson[] = [
{ index: 0, startFrame: 0, endFrame: 5 },
{ index: 1, startFrame: 5, endFrame: 10 },
];
const planDir = buildPlanDir("mp4", chunks, 10, false);

const chunkAPath = join(planDir, "chunk-0.mp4");
const chunkBPath = join(planDir, "chunk-1.mp4");
makeMp4Chunk(chunkAPath, 5);
makeMp4Chunk(chunkBPath, 5);

const outputPath = join(planDir, "output-cfr.mp4");
const result = await assemble(planDir, [chunkAPath, chunkBPath], null, outputPath, {
cfr: true,
});

expect(result.outputPath).toBe(outputPath);
expect(existsSync(outputPath)).toBe(true);
expect(result.framesEncoded).toBe(10);

// ffprobe both r_frame_rate AND avg_frame_rate — the CFR re-encode's
// contract is that they're equal and both exactly match the
// requested rate.
const probe = spawnSync(
"ffprobe",
[
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=r_frame_rate,avg_frame_rate,duration",
"-of",
"json",
outputPath,
],
{ stdio: "pipe" },
);
expect(probe.status).toBe(0);
const parsed = JSON.parse(probe.stdout.toString()) as {
streams?: Array<{ r_frame_rate?: string; avg_frame_rate?: string; duration?: string }>;
};
const stream = parsed.streams?.[0];
expect(stream).toBeDefined();
expect(stream?.r_frame_rate).toBe("30/1");
expect(stream?.avg_frame_rate).toBe("30/1");
const expectedDuration = 10 / 30;
const probedDuration = Number(stream?.duration ?? 0);
expect(Math.abs(probedDuration - expectedDuration)).toBeLessThan(0.001);
},
TIMEOUT_MS,
);

it(
"cfr:true rejects non-mp4 formats with a clear error",
async () => {
const chunks: ChunkSliceJson[] = [{ index: 0, startFrame: 0, endFrame: 5 }];
// png-sequence path short-circuits before the cfr check; webm/mov
// would hit the runtime guard. We rebuild plan.json with a non-mp4
// format manually so this test runs without a webm encoder.
const planDir = mkdtempSync(join(runRoot, "plan-webm-cfr-"));
mkdirSync(join(planDir, "meta"), { recursive: true });
writeFileSync(
join(planDir, "plan.json"),
JSON.stringify({
planHash: "fake",
totalFrames: 5,
hasAudio: false,
dimensions: { fpsNum: 30, fpsDen: 1, width: 160, height: 120, format: "webm" },
}),
"utf-8",
);
writeFileSync(join(planDir, "meta", "chunks.json"), JSON.stringify(chunks), "utf-8");
// Fabricate a placeholder file so the existence check passes — the
// cfr-guard error fires before we actually run the concat invocation
// in the multi-chunk branch; the single-chunk remux path runs first
// here, then we hit the cfr guard. Since the remux is real, only
// run this test when ffmpeg is present.
if (!hasFfmpeg) {
console.warn("[assemble.test] skipping cfr-non-mp4 test — ffmpeg not available");
return;
}
const chunkPath = join(planDir, "chunk-0.webm");
// Build a real 5-frame webm chunk so the concat step succeeds and
// the cfr guard is what actually trips.
const buildResult = spawnSync("ffmpeg", [
"-v",
"error",
"-f",
"lavfi",
"-i",
"testsrc=size=160x120:rate=30:duration=0.166666",
"-c:v",
"libvpx-vp9",
"-row-mt",
"1",
"-deadline",
"realtime",
"-cpu-used",
"8",
"-g",
"5",
"-keyint_min",
"5",
"-pix_fmt",
"yuv420p",
"-vframes",
"5",
"-y",
chunkPath,
]);
if (buildResult.status !== 0) {
console.warn(
"[assemble.test] skipping cfr-non-mp4 test — libvpx-vp9 not available on this host",
);
return;
}
let caught: unknown;
try {
await assemble(planDir, [chunkPath], null, join(planDir, "out.webm"), { cfr: true });
} catch (err) {
caught = err;
}
expect(caught).toBeDefined();
expect((caught as Error).message).toContain("cfr=true is only supported");
},
TIMEOUT_MS,
);

it(
"cfr:true rejects h265 chunks with a clear error",
async () => {
if (!hasFfmpeg) {
console.warn("[assemble.test] skipping cfr-h265 test — ffmpeg not available");
return;
}
// The cfr re-encode hardcodes `-c:v libx264`; pairing it with h265
// chunks would silently transcode them to h264. Assemble must throw
// a typed error instead of producing a wrong-codec deliverable. We
// stage a plan whose `meta/encoder.json` reports `libx265-software`
// and chunks built with libx264 (the bytes don't matter — the guard
// trips on the encoder discriminant before the re-encode runs).
const chunks: ChunkSliceJson[] = [{ index: 0, startFrame: 0, endFrame: 5 }];
const planDir = buildPlanDir("mp4", chunks, 5, false, "libx265-software");

const chunkPath = join(planDir, "chunk-0.mp4");
makeMp4Chunk(chunkPath, 5);

let caught: unknown;
try {
await assemble(planDir, [chunkPath], null, join(planDir, "out.mp4"), { cfr: true });
} catch (err) {
caught = err;
}
expect(caught).toBeDefined();
expect((caught as Error).message).toContain(
`cfr=true is not yet supported with codec: "h265"`,
);
},
TIMEOUT_MS,
);

it(
"merges png-sequence chunk directories with continuous global numbering",
() => {
Expand Down
102 changes: 98 additions & 4 deletions packages/producer/src/services/distributed/assemble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,29 @@ export async function assemble(
chunkPaths: readonly string[],
audioPath: string | null,
outputPath: string,
options?: { logger?: ProducerLogger; abortSignal?: AbortSignal },
options?: {
logger?: ProducerLogger;
abortSignal?: AbortSignal;
/**
* Opt-in exact-CFR re-encode. When `true`, the assembled video is
* re-encoded once at the end of the concat/single-chunk step with
* `-fps_mode cfr -r <fps>` so the stream-level `avg_frame_rate`
* matches the container's `r_frame_rate` exactly (and the file's
* duration lands on the requested `frameCount / fps` to ms
* precision, with no PTS-derived drift). Trade-off: ~2-5x the
* stitch time for a 60s 1080p clip plus second-generation H.264
* quality loss (negligible at `-crf 18` but non-zero). Default
* `false` preserves the existing `-c copy` behavior. mp4 only
* (libx264); webm / mov pass through unchanged because their
* stream-copy paths don't exhibit the same avg-frame-rate drift.
*/
cfr?: boolean;
},
): Promise<AssembleResult> {
const start = Date.now();
const log = options?.logger ?? defaultLogger;
const abortSignal = options?.abortSignal;
const cfr = options?.cfr === true;

// ── 1. Validate planDir manifest matches chunkPaths shape ──────────────
const planJsonPath = join(planDir, "plan.json");
Expand Down Expand Up @@ -193,12 +211,88 @@ export async function assemble(
}
}

// ── 2c. Optional exact-CFR re-encode ──────────────────────────────────
// The concat / single-chunk step produces a stream-copy intermediate
// whose container `r_frame_rate` is exact but whose stream-level
// `avg_frame_rate` stays PTS-derived (concat-copy carries each chunk's
// original PTS unmodified). For consumers that strict-check
// `avg_frame_rate` or ms-precision duration (broadcast workflows,
// frame-accurate compositors, some third-party transcoders), an
// opt-in re-encode with `-fps_mode cfr -r <fps>` lands the stream's
// avg-frame-rate on the requested rational exactly. Restricted to
// mp4 / libx264 — webm and mov go through their own stream-copy
// paths that don't exhibit the same avg-frame-rate drift, and h265
// mp4 would silently transcode to h264 under the hardcoded
// `-c:v libx264` re-encode (a typed throw is preferable to silent
// codec loss).
let postConcatPath = concatOutputPath;
if (cfr) {
if (plan.dimensions.format !== "mp4") {
throw new Error(
`[assemble] cfr=true is only supported for format="mp4" (got ` +
`"${plan.dimensions.format}"). Stream-copy paths for webm and mov ` +
`already produce exact avg_frame_rate; cfr re-encode is not needed.`,
);
}
// Read `meta/encoder.json` to detect the chunk encoder. The cfr
// re-encode hardcodes `-c:v libx264`; pairing it with h265 chunks
// would silently transcode them to h264. Throw a typed error so the
// caller surfaces the conflict instead of producing a wrong-codec
// deliverable.
const encoderJsonPath = join(planDir, "meta", "encoder.json");
if (!existsSync(encoderJsonPath)) {
throw new Error(`[assemble] planDir missing meta/encoder.json: ${encoderJsonPath}`);
}
const encoderJson = JSON.parse(readFileSync(encoderJsonPath, "utf-8")) as {
encoder?: string;
};
if (encoderJson.encoder === "libx265-software") {
throw new Error(
`[assemble] cfr=true is not yet supported with codec: "h265". The ` +
`cfr re-encode pass uses libx264 and would silently transcode the ` +
`h265 chunks. Either disable cfr or render with codec: "h264".`,
);
}
const cfrOutputPath = join(workDir, `cfr.${plan.dimensions.format}`);
const cfrArgs = [
"-i",
concatOutputPath,
"-c:v",
"libx264",
"-preset",
"medium",
"-crf",
"18",
"-pix_fmt",
"yuv420p",
"-fps_mode",
"cfr",
"-r",
fpsArg,
"-y",
cfrOutputPath,
];
const cfrResult = await runFfmpeg(cfrArgs, { signal: abortSignal });
if (!cfrResult.success) {
throw new Error(
`[assemble] ffmpeg cfr re-encode failed (exit ${cfrResult.exitCode}): ` +
`${cfrResult.stderr.slice(-400)}`,
);
}
postConcatPath = cfrOutputPath;
log.info("[assemble] cfr re-encode applied", {
format: plan.dimensions.format,
fpsNum: plan.dimensions.fpsNum,
fpsDen: plan.dimensions.fpsDen,
});
}

// ── 3. Audio: pad-or-trim then mux ────────────────────────────────────
let audioForMux: string | null = null;
if (audioPath !== null && existsSync(audioPath)) {
const paddedAudioPath = join(workDir, "audio-padded.aac");
const padTrimResult = await padOrTrimAudioToVideoFrameCount({
videoPath: concatOutputPath,
videoPath: postConcatPath,
audioPath,
outputPath: paddedAudioPath,
});
Expand All @@ -218,10 +312,10 @@ export async function assemble(
// because it operates on a `RenderJob` and emits `updateJobStatus`
// payloads — the distributed activity has no job to thread through.
const muxOutputPath =
audioForMux !== null ? join(workDir, `mux.${plan.dimensions.format}`) : concatOutputPath;
audioForMux !== null ? join(workDir, `mux.${plan.dimensions.format}`) : postConcatPath;
if (audioForMux !== null) {
const muxResult = await muxVideoWithAudio(
concatOutputPath,
postConcatPath,
audioForMux,
muxOutputPath,
abortSignal,
Expand Down
14 changes: 14 additions & 0 deletions packages/producer/src/services/distributed/plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ export interface DistributedRenderConfig {
/** HDR is not supported in distributed mode; `force-hdr` trips a `FormatNotSupportedInDistributedError`. Defaults to `force-sdr`. */
hdrMode?: "auto" | "force-sdr";

/**
* Opt-in exact-CFR re-encode at the assemble stage. When `true`, the
* stitched output is re-encoded once with `-fps_mode cfr -r <fps>` so
* the stream-level `avg_frame_rate` matches the container's
* `r_frame_rate` exactly (and the file duration is exact, not
* PTS-derived). Useful for downstream consumers that strict-check
* `avg_frame_rate` or ms-precision duration. Default `false` retains
* the existing `-c copy` stitch path, which is faster and lossless.
* mp4 only — webm / mov stream-copy paths already produce exact
* avg_frame_rate. Consumed by `assemble`; does not affect `planHash`
* (chunks render identically; only the final stitch step differs).
*/
cfr?: boolean;

logger?: ProducerLogger;
/** Optional engine config override (env vars are not read when provided). */
producerConfig?: EngineConfig;
Expand Down
Loading