diff --git a/Makefile b/Makefile index 310cf6b..73c540b 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,12 @@ dist/libav-wasm.js: -s INITIAL_MEMORY=150mb \ -s TOTAL_MEMORY=125mb \ -s STACK_SIZE=50mb \ + -s ALLOW_MEMORY_GROWTH=1 \ -s ASYNCIFY \ -s MODULARIZE=1 \ + -g \ + -gsource-map \ + --source-map-base http://localhost:1234/dist/ \ -s ASSERTIONS=2 \ -lavcodec -lavformat -lavfilter -lavdevice -lswresample -lswscale -lavutil -lm -lx264 \ -o dist/libav.js \ diff --git a/package-lock.json b/package-lock.json index 1b65b32..5a8a946 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "@rollup/plugin-commonjs": "^24.0.1", "concurrently": "^7.6.0", "copyfiles": "^2.4.1", - "nodemon": "^2.0.20", + "nodemon": "^2.0.22", "shx": "^0.3.4", "typescript": "^4.9.4", "vite": "^4.0.4" @@ -820,9 +820,9 @@ } }, "node_modules/nodemon": { - "version": "2.0.20", - "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-2.0.20.tgz", - "integrity": "sha512-Km2mWHKKY5GzRg6i1j5OxOHQtuvVsgskLfigG25yTtbyfRGn/GNvIbRyOf1PSCKJ2aT/58TiuUsuOU5UToVViw==", + "version": "2.0.22", + "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-2.0.22.tgz", + "integrity": "sha512-B8YqaKMmyuCO7BowF1Z1/mkPqLk6cs/l63Ojtd6otKjMx47Dq1utxfRxcavH1I7VSaL8n5BUaoutadnsX3AAVQ==", "dev": true, "dependencies": { "chokidar": "^3.5.2", diff --git a/package.json b/package.json index 68c69be..393c441 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,13 @@ "types": "build/index.d.ts", "type": "module", "scripts": { - "dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\"", + "dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\" \"npm run worker-build-dev\"", "dev-wasm": "nodemon -e cpp --exec \"npm run make-docker\"", "build-worker": "vite build --config vite-worker.config.ts", "make-docker": "docker-compose run libav-wasm make", "dev-web": "vite --port 1234", "vite-build": "vite build", + "worker-build-dev": "nodemon -e ts --watch src/worker --exec \"npm run build-worker\"", "build": "npm run copy-package && npm run make-docker && npm i ./dist && vite build && npm run build-worker && npm run types && npm run copy-wasm && npm remove libav", "types": "tsc", "copy-package": "copyfiles -u 2 ./src/build-config/package.json dist", @@ -24,7 +25,7 @@ "@rollup/plugin-commonjs": "^24.0.1", "concurrently": "^7.6.0", "copyfiles": "^2.4.1", - "nodemon": "^2.0.20", + "nodemon": "^2.0.22", "shx": "^0.3.4", "typescript": "^4.9.4", "vite": "^4.0.4" diff --git a/src/index.ts b/src/index.ts index 0758dc5..a7d5366 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,23 +1,15 @@ import type { Resolvers as WorkerResolvers } from './worker' -import PQueue from 'p-queue' import { call } from 'osra' -import { SEEK_FLAG, SEEK_WHENCE_FLAG } from './utils' - -export { - SEEK_FLAG, - SEEK_WHENCE_FLAG -} - export type MakeTransmuxerOptions = { /** Path that will be used to locate the .wasm file imported from the worker */ publicPath: string /** Path that will be used to locate the javascript worker file */ workerUrl: string workerOptions?: WorkerOptions - read: (offset: number, size: number) => Promise - seek: (currentOffset: number, offset: number, whence: SEEK_WHENCE_FLAG) => Promise + randomRead: (offset: number, size: number) => Promise + getStream: (offset: number) => Promise> subtitle: (title: string, language: string, data: string) => Promise | void attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise | void write: (params: { @@ -77,8 +69,8 @@ export const makeTransmuxer = async ({ publicPath, workerUrl, workerOptions, - read: _read, - seek: _seek, + randomRead: _randomRead, + getStream: _getStream, write: _write, attachment, subtitle: _subtitle, @@ -99,15 +91,16 @@ export const makeTransmuxer = async ({ const target = call(worker) - const apiQueue = new PQueue() - - const addTask = any>(func: T) => - apiQueue.add>>(func) - const subtitles = new Map() - let lastChunk: Chunk | undefined - const { init: _workerInit, destroy: _workerDestroy, process: _workerProcess, seek: _workerSeek, getInfo: _getInfo } = + let currentStream: ReadableStream | undefined + let reader: ReadableStreamDefaultReader | undefined + + let streamResultPromiseResolve: (value: { value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }) => void + let streamResultPromiseReject: (reason?: any) => void + let streamResultPromise: Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }> + + const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } = await target( 'init', { @@ -155,82 +148,94 @@ export const makeTransmuxer = async ({ _subtitle(subtitle.title, subtitle.language, subtitleString) }, attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer), - read: (offset, bufferSize) => _read(offset, bufferSize), - seek: (currentOffset, offset, whence) => _seek(currentOffset, offset, whence), - write: async ({ + randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize), + streamRead: async (offset: number) => { + if (!currentStream) { + currentStream = await _getStream(offset) + reader = currentStream.getReader() + } + + streamResultPromise = new Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>((resolve, reject) => { + streamResultPromiseResolve = resolve + streamResultPromiseReject = reject + }) + + const tryReading = (): Promise | undefined => + reader + ?.read() + .then(result => ({ + value: result.value?.buffer, + done: result.value === undefined, + cancelled: false + })) + .then(async (result) => { + if (result.done) { + reader?.cancel() + if (offset >= length) { + return streamResultPromiseResolve(result) + } + currentStream = await _getStream(offset) + reader = currentStream.getReader() + return tryReading() + } + + return streamResultPromiseResolve(result) + }) + .catch((err) => streamResultPromiseReject(err)) + + tryReading() + + return ( + streamResultPromise + .then((value) => ({ + value: value.value, + done: value.done, + cancelled: false + })) + .catch(err => { + console.error(err) + return { + value: undefined, + done: false, + cancelled: true + } + }) + ) + }, + clearStream: async () => { + reader?.cancel() + currentStream = undefined + reader = undefined + }, + write: ({ isHeader, offset, arrayBuffer, position, pts, duration - }) => { - const chunk = { - isHeader, - offset, - buffer: new Uint8Array(arrayBuffer), - pts, - duration, - pos: position - } - - if (!isHeader) { - lastChunk = chunk - processBufferChunks.push(chunk) - } - - await _write(chunk) - } + }) => _write({ + isHeader, + offset, + buffer: new Uint8Array(arrayBuffer), + pts, + duration, + pos: position + }) } ) - const workerQueue = new PQueue({ concurrency: 1 }) - - const addWorkerTask = any>(func: T) => - (...args: Parameters) => - workerQueue.add>>(() => func(...args)) - - const workerInit = addWorkerTask(_workerInit) - const workerDestroy = addWorkerTask(_workerDestroy) - const workerProcess = addWorkerTask(_workerProcess) - const workerSeek = addWorkerTask(_workerSeek) - const getInfo = addWorkerTask(_getInfo) - - let processBufferChunks: Chunk[] = [] - const result = { - init: () => addTask(async () => { - processBufferChunks = [] - await workerInit() - }), + init: () => workerInit(), destroy: (destroyWorker = false) => { if (destroyWorker) { worker.terminate() return } - return addTask(() => workerDestroy()) - }, - process: (timeToProcess: number) => addTask(async () => { - processBufferChunks = [] - await workerProcess(timeToProcess) - const writtenChunks = processBufferChunks - processBufferChunks = [] - return writtenChunks - }), - seek: (time: number) => { - return addTask(async () => { - // if (lastChunk && (lastChunk.pts > time)) { - // await workerDestroy() - // processBufferChunks = [] - // await workerInit() - // } - processBufferChunks = [] - await workerSeek( - Math.max(0, time) * 1000, - SEEK_FLAG.NONE - ) - }) + return workerDestroy() }, + read: () => workerRead(), + seek: (time: number) => workerSeek(Math.max(0, time) * 1000), getInfo: () => getInfo() as Promise<{ input: MediaInfo, output: MediaInfo }> } diff --git a/src/main.cpp b/src/main.cpp index 47f0807..ce135b5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,6 +1,7 @@ #include #include #include +#include using namespace emscripten; using namespace std; @@ -11,6 +12,14 @@ extern "C" { #include }; +template +inline std::string to_string (const T& t) +{ + std::stringstream ss; + ss << t; + return ss.str(); +} + int main() { return 0; } @@ -35,7 +44,7 @@ extern "C" { static int readOutputFunction(void* opaque, uint8_t* buf, int buf_size); static int64_t seekFunction(void* opaque, int64_t offset, int whence); - class Transmuxer { + class Remuxer { public: AVIOContext* input_avio_context; AVIOContext* output_avio_context; @@ -63,19 +72,32 @@ extern "C" { std::string video_mime_type; std::string audio_mime_type; - val read = val::undefined(); + val randomRead = val::undefined(); + + val streamRead = val::undefined(); + val currentReadStream = val::undefined(); + val clearStream = val::undefined(); + val attachment = val::undefined(); val subtitle = val::undefined(); val write = val::undefined(); - val seek = val::undefined(); + val flush = val::undefined(); val error = val::undefined(); + double currentOffset = 0; + bool first_init = true; bool initializing = false; vector init_buffers; int init_buffer_count = 0; + bool first_seek = true; + bool seeking = false; + + vector seek_buffers; + int seek_buffer_count = 0; + val promise = val::undefined(); // val promise = val::global("Promise")["resolve"]().as(); // val promise = val::global("Promise").get("resolve").as(); @@ -88,15 +110,17 @@ extern "C" { printf("\n"); } - Transmuxer(val options) { + Remuxer(val options) { promise = options["promise"]; input_length = options["length"].as(); buffer_size = options["bufferSize"].as(); - read = options["read"]; + randomRead = options["randomRead"]; + streamRead = options["streamRead"]; + clearStream = options["clearStream"]; attachment = options["attachment"]; subtitle = options["subtitle"]; write = options["write"]; - seek = options["seek"]; + flush = options["flush"]; error = options["error"]; } @@ -209,14 +233,22 @@ extern "C" { return mime_type; } - void init (bool _first_init) { + void init () { initializing = true; init_buffer_count = 0; + seek_buffer_count = 0; int res; is_header = true; duration = 0; first_frame = true; + prev_duration = 0; + prev_pts = 0; + prev_pos = 0; + duration = 0; + pts = 0; + pos = 0; + input_avio_context = nullptr; input_format_context = avformat_alloc_context(); // output_format_context = avformat_alloc_context(); @@ -417,27 +449,25 @@ extern "C" { return; } if (first_init) { - write( - static_cast(input_format_context->pb->pos), - NULL, - is_header, - true, - 0, + flush( + to_string(input_format_context->pb->pos), + to_string(0), 0, 0 ); + is_flushing = false; } initializing = false; first_init = false; } - void process(double time_to_process) { + void read() { int res; - double start_process_pts = 0; + bool flushed = false; // loop through the packet frames until we reach the processed size - while (start_process_pts == 0 || (pts < (start_process_pts + time_to_process))) { + while (!flushed) { AVPacket* packet = av_packet_alloc(); if ((res = av_read_frame(input_format_context, packet)) < 0) { @@ -445,12 +475,9 @@ extern "C" { avio_flush(output_format_context->pb); is_flushing = true; av_write_trailer(output_format_context); - write( - static_cast(input_format_context->pb->pos), - NULL, - is_header, - true, - pos, + flush( + to_string(input_format_context->pb->pos), + to_string(pos), pts, duration ); @@ -505,17 +532,13 @@ extern "C" { // Rescale the PTS/DTS from the input time base to the output time base av_packet_rescale_ts(packet, in_stream->time_base, out_stream->time_base); - if (!start_process_pts) { - start_process_pts = packet->pts * av_q2d(out_stream->time_base); - } - if (in_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - if (!start_process_pts) { - start_process_pts = packet->pts * av_q2d(out_stream->time_base); - } + // printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); duration += packet->duration * av_q2d(out_stream->time_base); } + bool empty_flush = false; + // Set needed pts/pos/duration needed to calculate the real timestamps if (is_keyframe) { bool was_header = is_header; @@ -523,10 +546,18 @@ extern "C" { is_header = false; } else { is_flushing = true; + empty_flush = flush( + to_string(input_format_context->pb->pos), + to_string(prev_pos), + prev_pts, + prev_duration + ).await().as(); + flushed = true; } prev_duration = duration; prev_pts = pts; + // printf("pts: %f, duration: %f\n", prev_pts, duration); prev_pos = pos; duration = 0; @@ -541,6 +572,21 @@ extern "C" { continue; } + if (is_flushing && empty_flush) { + empty_flush = flush( + to_string(input_format_context->pb->pos), + to_string(prev_pos), + prev_pts, + prev_duration + ).await().as(); + is_flushing = false; + flushed = true; + + if (empty_flush) { + flushed = false; + } + } + // free packet av_packet_unref(packet); av_packet_free(&packet); @@ -574,11 +620,12 @@ extern "C" { return output_avio_context->pos; } - int _seek(int timestamp, int flags) { + int _seek(int timestamp) { + seeking = true; destroy(); - init(false); + init(); - process(0.01); + clearStream().await(); int res; prev_duration = 0; @@ -587,9 +634,11 @@ extern "C" { duration = 0; pts = 0; pos = 0; - if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, flags)) < 0) { + if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, AVSEEK_FLAG_BACKWARD)) < 0) { printf("ERROR: could not seek frame | %s \n", av_err2str(res)); } + seeking = false; + first_seek = false; return 0; } @@ -618,11 +667,20 @@ extern "C" { // Seek callback called by AVIOContext static int64_t seekFunction(void* opaque, int64_t offset, int whence) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); - emscripten::val &seek = remuxObject.seek; - // call the JS seek function - double result = seek(static_cast(offset), whence).await().as(); - return result; + Remuxer &remuxObject = *reinterpret_cast(opaque); + if (whence == SEEK_CUR) { + return remuxObject.currentOffset + offset; + } + if (whence == SEEK_END) { + return -1; + } + if (whence == SEEK_SET) { + return offset; + } + if (whence == AVSEEK_SIZE) { + return remuxObject.input_length; + } + return -1; } // If emscripten asynchify ever start working for libraries callbacks, @@ -630,27 +688,58 @@ extern "C" { // Read callback called by AVIOContext static int readFunction(void* opaque, uint8_t* buf, int buf_size) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); - emscripten::val &read = remuxObject.read; + Remuxer &remuxObject = *reinterpret_cast(opaque); std::string buffer; + if (remuxObject.initializing) { + emscripten::val &randomRead = remuxObject.randomRead; if (remuxObject.first_init) { - val res = read(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await(); - buffer = res["buffer"].as(); + buffer = + randomRead( + to_string(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); remuxObject.init_buffers.push_back(buffer); } else { remuxObject.promise.await(); buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; remuxObject.init_buffer_count++; } + } else if(remuxObject.seeking) { + emscripten::val &randomRead = remuxObject.randomRead; + if (remuxObject.first_seek) { + buffer = + randomRead( + to_string(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); + remuxObject.seek_buffers.push_back(buffer); + } else { + remuxObject.promise.await(); + buffer = remuxObject.seek_buffers[remuxObject.seek_buffer_count]; + remuxObject.seek_buffer_count++; + } } else { - // call the JS read function and get its result as - // { - // buffer: Uint8Array, - // size: int - // } - val res = read(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await(); - buffer = res["buffer"].as(); + emscripten::val result = + remuxObject + .streamRead( + to_string(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await(); + bool is_cancelled = result["cancelled"].as(); + if (is_cancelled) { + return AVERROR_EXIT; + } + bool is_done = result["done"].as(); + if (is_done) { + return AVERROR_EOF; + } + buffer = result["value"].as(); } int buffer_size = buffer.size(); @@ -659,13 +748,15 @@ extern "C" { } // copy the result buffer into AVIO's buffer memcpy(buf, (uint8_t*)buffer.c_str(), buffer_size); + + remuxObject.currentOffset = remuxObject.currentOffset + buffer_size; // If result buffer size is 0, we reached the end of the file return buffer_size; } // Write callback called by AVIOContext static int writeFunction(void* opaque, uint8_t* buf, int buf_size) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); + Remuxer &remuxObject = *reinterpret_cast(opaque); if (remuxObject.initializing && !remuxObject.first_init) { return buf_size; @@ -675,24 +766,14 @@ extern "C" { // call the JS write function write( - static_cast(remuxObject.input_format_context->pb->pos), emscripten::val( emscripten::typed_memory_view( buf_size, buf ) - ), - remuxObject.is_header, - remuxObject.is_flushing, - remuxObject.prev_pos, - remuxObject.prev_pts, - remuxObject.prev_duration + ) ).await(); - if (remuxObject.is_flushing) { - remuxObject.is_flushing = false; - } - return buf_size; } @@ -710,12 +791,12 @@ extern "C" { .field("input", &InfoObject::input) .field("output", &InfoObject::output); - class_("Transmuxer") + class_("Remuxer") .constructor() - .function("init", &Transmuxer::init) - .function("process", &Transmuxer::process) - .function("destroy", &Transmuxer::destroy) - .function("seek", &Transmuxer::_seek) - .function("getInfo", &Transmuxer::getInfo); + .function("init", &Remuxer::init) + .function("read", &Remuxer::read) + .function("destroy", &Remuxer::destroy) + .function("seek", &Remuxer::_seek) + .function("getInfo", &Remuxer::getInfo); } } diff --git a/src/test.ts b/src/test.ts index 9ada331..b4f93c9 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,7 +1,7 @@ // @ts-ignore import PQueue from 'p-queue' -import { SEEK_WHENCE_FLAG, queuedDebounceWithLastCall } from './utils' +import { queuedDebounceWithLastCall, toBufferedStream, toStreamChunkSize } from './utils' import { makeTransmuxer } from '.' type Chunk = { @@ -12,12 +12,9 @@ type Chunk = { pos: number } -const BUFFER_SIZE = 5_000_000 +const BUFFER_SIZE = 2_500_000 const VIDEO_URL = '../video5.mkv' // const VIDEO_URL = '../spidey.mkv' -const PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS = 10 -const POST_SEEK_NEEDED_BUFFERS_IN_SECONDS = 30 -const POST_SEEK_REMOVE_BUFFERS_IN_SECONDS = 60 export default async function saveFile(plaintext: ArrayBuffer, fileName: string, fileType: string) { return new Promise((resolve, reject) => { @@ -64,6 +61,8 @@ export default async function saveFile(plaintext: ArrayBuffer, fileName: string, } + + fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) .then(async ({ headers, body }) => { if (!body) throw new Error('no body') @@ -73,52 +72,42 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) ? Number(contentRangeContentLength) : Number(headers.get('Content-Length')) - let headerChunk: Chunk + // let headerChunk: Chunk let ended = false - let chunks: Chunk[] = [] const workerUrl2 = new URL('../build/worker.js', import.meta.url).toString() const blob = new Blob([`importScripts(${JSON.stringify(workerUrl2)})`], { type: 'application/javascript' }) const workerUrl = URL.createObjectURL(blob) - const transmuxer = await makeTransmuxer({ + const remuxer = await makeTransmuxer({ publicPath: new URL('/dist/', new URL(import.meta.url).origin).toString(), workerUrl, bufferSize: BUFFER_SIZE, length: contentLength, - read: (offset, size) => - // console.log('read', offset, size) || - offset >= Math.min(offset + size, contentLength) - ? Promise.resolve(new ArrayBuffer(0)) - : ( - fetch( - VIDEO_URL, - { - headers: { - Range: `bytes=${offset}-${Math.min(offset + size, contentLength) - 1}` - } + randomRead: (offset, size) => + fetch( + VIDEO_URL, + { + headers: { + Range: `bytes=${offset}-${Math.min(offset + size, contentLength) - 1}` + } + } + ).then(res => res.arrayBuffer()), + getStream: (offset) => + fetch( + VIDEO_URL, + { + headers: { + Range: `bytes=${offset}-` } + } + ).then(res => + toBufferedStream(3)( + toStreamChunkSize(BUFFER_SIZE)( + res.body! + ) ) - .then(res => res.arrayBuffer()) ), - seek: async (currentOffset, offset, whence) => { - // console.log('seek', { currentOffset, offset, whence }) - if (whence === SEEK_WHENCE_FLAG.SEEK_CUR) { - return currentOffset + offset - } - if (whence === SEEK_WHENCE_FLAG.SEEK_END) { - return -1 - } - if (whence === SEEK_WHENCE_FLAG.SEEK_SET) { - // little trick to prevent libav from requesting end of file data on init that might take a while to fetch - // if (!initDone && offset > (contentLength - 1_000_000)) return -1 - return offset - } - if (whence === SEEK_WHENCE_FLAG.AVSEEK_SIZE) { - return contentLength - } - return -1 - }, subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) }, @@ -126,84 +115,26 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // console.log('attachment', filename, mimetype, buffer) }, write: ({ isHeader, offset, buffer, pts, duration: chunkDuration, pos }) => { - // console.log('write', { isHeader, offset, buffer, pts, duration: chunkDuration, pos }) - if (offset === contentLength) { - const lastChunk = chunks.at(-1) - if (!lastChunk) throw new Error('No last chunk found') - chunks = [ - ...chunks, - { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - ] - ended = true - return - } - if (isHeader) { - if (!headerChunk) { - headerChunk = { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - } - return - } - chunks = [ - ...chunks, - { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - ] - // console.log('chunks', chunks) - // if (chunks.length === 2) { - // const buffer = - // chunks - // .map(({ buffer }) => buffer) - // .reduce( - // (acc, buffer) => { - // const ab = new Uint8Array(acc.byteLength + buffer.byteLength) - // ab.set(new Uint8Array(acc), 0) - // ab.set(new Uint8Array(buffer), acc.byteLength) - // return ab - // }, - // headerChunk.buffer - // ) - // .buffer - // console.log('buffer', buffer) - // saveFile( - // buffer, - // 'test.mp4', - // 'video/mp4' - // ) + // if (isHeader) { + // if (!headerChunk) { + // headerChunk = { + // offset, + // buffer: new Uint8Array(buffer), + // pts, + // duration: chunkDuration, + // pos + // } + // } + // return // } } }) - const processingQueue = new PQueue({ concurrency: 1 }) + const headerChunk = await remuxer.init() - const process = (timeToProcess = POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) => - processingQueue.add( - () => ended ? Promise.resolve(undefined) : transmuxer.process(timeToProcess), - { throwOnTimeout: true } - ) - - await transmuxer.init() + if (!headerChunk) throw new Error('No header chunk found after remuxer init') - // @ts-ignore - if (!headerChunk) throw new Error('No header chunk found after transmuxer init') - - const mediaInfo = await transmuxer.getInfo() + const mediaInfo = await remuxer.getInfo() const duration = mediaInfo.input.duration / 1_000_000 const video = document.createElement('video') @@ -233,7 +164,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) 'seeking', 'stalled', 'suspend', - 'timeupdate', + // 'timeupdate', 'volumechange', 'waiting' ] @@ -248,15 +179,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) video.controls = true video.volume = 0 video.addEventListener('error', ev => { - // @ts-ignore + // @ts-expect-error console.error(ev.target?.error) }) document.body.appendChild(video) document.body.appendChild(seconds) - - setInterval(() => { - seconds.textContent = video.currentTime.toString() - }, 100) const mediaSource = new MediaSource() video.src = URL.createObjectURL(mediaSource) @@ -309,17 +236,13 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) }) ) - // const bufferChunk = (chunk: Chunk) => appendBuffer(chunk.buffer.buffer) - const bufferChunk = async (chunk: Chunk) => { - // console.log('bufferChunk', getTimeRanges().map(range => `${range.start}-${range.end}`).join(' '), chunk) - try { - await appendBuffer(chunk.buffer.buffer) - } catch (err) { - console.error(err) - throw err - } - // console.log('bufferedChunk', getTimeRanges().map(range => `${range.start}-${range.end}`).join(' ')) - } + const unbufferRange = async (start: number, end: number) => + queue.add(() => + new Promise((resolve, reject) => { + setupListeners(resolve, reject) + sourceBuffer.remove(start, end) + }) + ) const getTimeRanges = () => Array(sourceBuffer.buffered.length) @@ -330,202 +253,121 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) end: sourceBuffer.buffered.end(index) })) - const getTimeRange = (time: number) => - getTimeRanges() - .find(({ start, end }) => time >= start && time <= end) - - const unbufferRange = async (start: number, end: number) => - queue.add(() => - new Promise((resolve, reject) => { - setupListeners(resolve, reject) - sourceBuffer.remove(start, end) - }) - ) - - const unbufferChunk = (chunk: Chunk) => unbufferRange(chunk.pts, chunk.pts + chunk.duration) - - const removeChunk = async (chunk: Chunk) => { - const chunkIndex = chunks.indexOf(chunk) - if (chunkIndex === -1) throw new RangeError('No chunk found') - await unbufferChunk(chunk) - // chunks = chunks.filter(_chunk => _chunk !== chunk) + video.addEventListener('canplaythrough', () => { + video.playbackRate = 1 + video.play() + }, { once: true }) + + let chunks: Chunk[] = [] + + const PREVIOUS_BUFFER_COUNT = 1 + const BUFFER_COUNT = 3 + + await appendBuffer(headerChunk.buffer) + + const pull = async () => { + const chunk = await remuxer.read() + chunks = [...chunks, chunk] + return chunk } - // todo: add error checker & retry to seek a bit earlier - const seek = queuedDebounceWithLastCall(500, async (time: number) => { - ended = false - const ranges = getTimeRanges() - if (ranges.some(({ start, end }) => time >= start && time <= end)) { - return - } - const allTasksDone = new Promise(resolve => { - processingQueue.size && processingQueue.pending - ? ( - processingQueue.on( - 'next', - () => - processingQueue.pending === 0 - ? resolve(undefined) - : undefined - ) - ) - : resolve(undefined) - }) - processingQueue.pause() - processingQueue.clear() - await allTasksDone - processingQueue.start() - - const seekTime = Math.max(0, time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) - await transmuxer.seek(seekTime) - await process(POST_SEEK_NEEDED_BUFFERS_IN_SECONDS + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) - for (const range of ranges) { - await unbufferRange(range.start, range.end) + let seeking = false + + const updateBuffers = queuedDebounceWithLastCall(250, async () => { + if (seeking) return + const { currentTime } = video + const currentChunkIndex = chunks.findIndex(({ pts, duration }) => pts <= currentTime && pts + duration >= currentTime) + const sliceIndex = Math.max(0, currentChunkIndex - PREVIOUS_BUFFER_COUNT) + + // console.log('currentChunkIndex', currentChunkIndex, chunks.length, currentTime) + for (let i = 0; i < sliceIndex + BUFFER_COUNT; i++) { + // console.log('pull check', i, chunks[i]) + if (chunks[i]) continue + // console.log('pulling') + const chunk = await pull() + // console.log('pull', chunk) + await appendBuffer(chunk.buffer) } - for (const chunk of chunks) { - if (chunk.pts <= seekTime) continue - await bufferChunk(chunk) - } - video.currentTime = time - }) - const updateBufferedRanges = async (time: number) => { - const ranges1 = getTimeRanges() - // console.log('updateBufferedRanges', ranges1, chunks) - const neededChunks = - chunks - .filter(({ pts, duration }) => - ((time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) < pts) - && ((time + POST_SEEK_REMOVE_BUFFERS_IN_SECONDS) > (pts + duration)) - ) - - const shouldBeBufferedChunks = - neededChunks - .filter(({ pts, duration }) => - ((time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) < pts) - && ((time + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) > (pts + duration)) - ) - - const shouldBeUnbufferedChunks = - chunks - .filter(({ pts, duration }) => ranges1.some(({ start, end }) => start < (pts + (duration / 2)) && (pts + (duration / 2)) < end)) - .filter((chunk) => !shouldBeBufferedChunks.includes(chunk)) - - const nonNeededChunks = - chunks - .filter((chunk) => !neededChunks.includes(chunk)) - - for (const shouldBeUnbufferedChunk of shouldBeUnbufferedChunks) { - await unbufferChunk(shouldBeUnbufferedChunk) - } - for (const nonNeededChunk of nonNeededChunks) { - await removeChunk(nonNeededChunk) - } - const firstChunk = neededChunks.sort(({ pts }, { pts: pts2 }) => pts - pts2).at(0) - const lastChunk = neededChunks.sort(({ pts, duration }, { pts: pts2, duration: duration2 }) => (pts + duration) - (pts2 + duration2)).at(-1) - - // if (firstChunk && lastChunk) { - // console.log('firstChunk & lastChunk', firstChunk.pts, lastChunk.pts + lastChunk.duration) - // sourceBuffer.appendWindowStart = Math.max(time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS, 0) - // sourceBuffer.appendWindowEnd = Math.min(time + PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS, duration) - // } else { - // sourceBuffer.appendWindowStart = 0 - // sourceBuffer.appendWindowEnd = Infinity - // } - // console.log('shouldBeBufferedChunks', shouldBeBufferedChunks) - for (const chunk of shouldBeBufferedChunks) { - // if (chunk.buffered) continue - try { - // console.log('RANGES', getTimeRanges().map(({ start, end }) => `${start} - ${end}`)) - await bufferChunk(chunk) - // console.log('RANGES 2', getTimeRanges().map(({ start, end }) => `${start} - ${end}`)) - } catch (err) { - console.error(err) - if (!(err instanceof Event)) throw err - break - } - } + if (sliceIndex) chunks = chunks.slice(sliceIndex) - const lowestAllowedStart = - firstChunk - ? Math.max(firstChunk?.pts - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS, 0) - : undefined - const highestAllowedEnd = - lastChunk - ? Math.min(lastChunk.pts + lastChunk.duration + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS, duration) - : undefined - const ranges = getTimeRanges() - // console.log('ranges', ranges, lowestAllowedStart, highestAllowedEnd, neededChunks, chunks) - for (const { start, end } of ranges) { - if (!lowestAllowedStart || !highestAllowedEnd) continue - // console.log('range', start, end) - if (lowestAllowedStart !== undefined && start < lowestAllowedStart) { - await unbufferRange(start, lowestAllowedStart) - } - if (highestAllowedEnd !== undefined && end > highestAllowedEnd) { - await unbufferRange(highestAllowedEnd, end) + const bufferedRanges = getTimeRanges() + + const firstChunk = chunks.at(0) + const lastChunk = chunks.at(-1) + if (!firstChunk || !lastChunk || firstChunk === lastChunk) return + const minTime = firstChunk.pts + + for (const { start, end } of bufferedRanges) { + const chunkIndex = chunks.findIndex(({ pts, duration }) => start <= (pts + (duration / 2)) && (pts + (duration / 2)) <= end) + if (chunkIndex === -1) { + await unbufferRange(start, end) + } else { + if (start < minTime) { + await unbufferRange( + start, + minTime + ) + } } } - } - - await appendBuffer(headerChunk.buffer) - await new Promise(resolve => { - video.addEventListener('loadedmetadata', () => resolve(undefined), { once: true }) }) - // console.log('ranges', getTimeRanges()) - // console.groupCollapsed('process 30') - await process(30) - // console.groupEnd() - await updateBufferedRanges(0) - // console.log('ranges', getTimeRanges()) - - // console.groupCollapsed('process 10') - // await process(10) - // console.groupEnd() - - // console.groupCollapsed('seek 30') - // await transmuxer.seek(30) - // console.groupEnd() - // console.groupCollapsed('process 10') - // await process(10) - // console.groupEnd() - // await updateBufferedRanges(0) - - // setInterval(() => { - // console.log('ranges', getTimeRanges()) - // }, 5_000) - - const timeUpdateWork = queuedDebounceWithLastCall(500, async (time: number) => { - const lastChunk = chunks.sort(({ pts }, { pts: pts2 }) => pts - pts2).at(-1) - if (lastChunk && lastChunk.pts < time + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) { - await process() - } - await updateBufferedRanges(time) + const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { + seeking = true + await appendBuffer(headerChunk.buffer) + chunks = [] + await remuxer.seek(seekTime) + const chunk1 = await pull() + sourceBuffer.timestampOffset = chunk1.pts + await appendBuffer(chunk1.buffer) + seeking = false + await updateBuffers() }) + const firstChunk = await pull() + appendBuffer(firstChunk.buffer) + video.addEventListener('timeupdate', () => { - timeUpdateWork(video.currentTime) + updateBuffers() }) - video.addEventListener('canplaythrough', () => { - video.playbackRate = 1 - video.play() - }, { once: true }) + video.addEventListener('waiting', () => { + updateBuffers() + }) - video.addEventListener('seeking', () => { - // console.log('SEEKING', video.currentTime) + video.addEventListener('seeking', (ev) => { + if (seeking) return seek(video.currentTime) }) - await new Promise(resolve => setTimeout(resolve, 1000)) - - video.pause() - await seek(1370) - video.play() + updateBuffers() + setInterval(() => { + seconds.textContent = video.currentTime.toString() + }, 100) - // video.currentTime = 1360 - // await new Promise(resolve => setTimeout(resolve, 1000)) - video.playbackRate = 5 + // setInterval(async () => { + // console.log('time ranges', getTimeRanges(), chunks) + // }, 1000) + + setTimeout(async () => { + // await video.pause() + // video.currentTime = 587.618314 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.playbackRate = 5 + // video.currentTime = 400 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 300 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 500 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 600 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 300 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 534.953306 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 100 + }, 1000) }) diff --git a/src/utils.ts b/src/utils.ts index c7cf8e0..6ef0b9e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -25,6 +25,11 @@ export enum SEEK_WHENCE_FLAG { AVSEEK_SIZE = 1 << 16 //0x10000, } +export enum AVERROR { + EOF = -541478725, + EXIT = 255 +} + export const notifyInterface = (sharedArrayBuffer: SharedArrayBuffer, value: State) => { const int32Array = new Int32Array(sharedArrayBuffer) int32Array.set([value], 0) @@ -128,30 +133,40 @@ export const queuedDebounceWithLastCall = +// todo: reimplement this into a ReadableByteStream https://web.dev/streams/ once Safari gets support +export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => new ReadableStream({ + reader: undefined, + leftOverData: undefined, + closed: false, start() { - // @ts-ignore this.reader = stream.getReader() + this.reader.closed.then(() => { + this.closed = true + }) }, async pull(controller) { - // @ts-ignore const { leftOverData }: { leftOverData: Uint8Array | undefined } = this const accumulate = async ({ buffer = new Uint8Array(SIZE), currentSize = 0 } = {}): Promise<{ buffer?: Uint8Array, currentSize?: number, done: boolean }> => { - // @ts-ignore - const { value: newBuffer, done } = await this.reader.read() + if (this.closed) { + this.reader = undefined + this.leftOverData = undefined + return { buffer: new Uint8Array(), currentSize: 0, done: true } + } + const { value: newBuffer, done } = await this.reader!.read() if (currentSize === 0 && leftOverData) { buffer.set(leftOverData) currentSize += leftOverData.byteLength - // @ts-ignore this.leftOverData = undefined } if (done) { - return { buffer: buffer.slice(0, currentSize), currentSize, done } + const finalResult = { buffer: buffer.slice(0, currentSize), currentSize, done } + this.reader = undefined + this.leftOverData = undefined + return finalResult } let newSize @@ -160,7 +175,6 @@ export const bufferStream = ({ stream, size: SIZE }: { stream: ReadableStream, s buffer.set(slicedBuffer, currentSize) if (newSize === SIZE) { - // @ts-ignore this.leftOverData = newBuffer.slice(SIZE - currentSize) return { buffer, currentSize: newSize, done: false } } @@ -170,5 +184,70 @@ export const bufferStream = ({ stream, size: SIZE }: { stream: ReadableStream, s const { buffer, done } = await accumulate() if (buffer?.byteLength) controller.enqueue(buffer) if (done) controller.close() + }, + cancel() { + this.reader!.cancel() + this.reader = undefined + this.leftOverData = undefined + } + } as UnderlyingDefaultSource & { + reader: ReadableStreamDefaultReader | undefined + leftOverData: Uint8Array | undefined + closed: boolean + }) + +export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => + new ReadableStream({ + buffers: [], + currentPullPromise: undefined, + reader: undefined, + leftOverData: undefined, + closed: false, + start() { + this.reader = stream.getReader() + this.reader.closed.then(() => { + this.closed = true + }) + }, + async pull(controller) { + const pull = async () => { + if (this.closed) return + if (this.buffers.length >= SIZE) return + this.currentPullPromise = this.reader!.read() + const { value: newBuffer, done } = await this.currentPullPromise + this.currentPullPromise = undefined + if (done) { + controller.close() + return + } + this.buffers.push(newBuffer) + return newBuffer + } + + const tryToBuffer = async (): Promise => { + if (this.buffers.length >= SIZE || this.closed) return + + if (this.buffers.length === 0) { + await pull() + return tryToBuffer() + } else { + pull().then(() => { + tryToBuffer() + }) + } + } + + await tryToBuffer() + controller.enqueue(this.buffers.shift()) + tryToBuffer() + }, + cancel() { + this.reader!.cancel() } + } as UnderlyingDefaultSource & { + reader: ReadableStreamDefaultReader | undefined + leftOverData: Uint8Array | undefined + buffers: Uint8Array[] + currentPullPromise: Promise> | undefined + closed: boolean }) diff --git a/src/worker/index.ts b/src/worker/index.ts index 868097e..42ede1c 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,43 +1,49 @@ import { makeCallListener, registerListener } from 'osra' +import type { Chunk } from '..' // @ts-ignore import WASMModule from 'libav' -import { SEEK_FLAG, SEEK_WHENCE_FLAG } from '../utils' - const makeModule = (publicPath: string) => WASMModule({ - module: { - locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}` - }, - locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}` + locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}`, + printErr: (text: string) => + text.includes('Timestamps are unset in a packet') + ? undefined + : console.error(text), }) let module: ReturnType -// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple transmuxer already initialized waiting to seek + process -// todo: We can keep in memory all of the chunks needed to initialize the transmuxer +// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple remuxer alrandomReady initialized waiting to seek + process +// todo: We can keep in memory all of the chunks needed to initialize the remuxer // @ts-ignore const init = makeCallListener(async ( - { publicPath, length, bufferSize, read, seek, write, attachment, subtitle }: + { publicPath, length, bufferSize, randomRead, streamRead, clearStream, write, attachment, subtitle }: { publicPath: string length: number bufferSize: number - read: (offset: number, size: number) => Promise + randomRead: (offset: number, size: number) => Promise + streamRead: (offset: number) => Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }> + clearStream: () => Promise write: (params: { offset: number, arrayBuffer: ArrayBuffer, isHeader: boolean, position: number, pts: number, duration: number }) => Promise | void - seek: (currentOffset: number, offset: number, whence: SEEK_WHENCE_FLAG) => Promise subtitle: (streamIndex: number, isHeader: boolean, data: string, ...rest: [number, number] | [string, string]) => Promise attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise }) => { if (!module) module = await makeModule(publicPath) - let currentOffset = 0 - let currentBuffer = new Uint8Array(0) - const makeTransmuxer = () => new module.Transmuxer({ + + let writeBuffer = new Uint8Array(0) + + let readResultPromiseResolve: (value: Chunk) => void + let readResultPromiseReject: (reason?: any) => void + let readResultPromise: Promise + + const makeRemuxer = () => new module.Remuxer({ promise: Promise.resolve(), length, bufferSize, @@ -52,63 +58,77 @@ const init = makeCallListener(async ( const arraybuffer = uint8.buffer.slice(uint8.byteOffset, uint8.byteOffset + uint8.byteLength) attachment(filename, mimetype, arraybuffer) }, - seek: async (offset: number, whence: SEEK_WHENCE_FLAG) => seek(currentOffset, offset, whence), - read: async (offset: number, bufferSize: number) => { - const buffer = await read(offset, bufferSize) - currentOffset = offset + buffer.byteLength + streamRead: async (_offset: string) => { + const offset = Number(_offset) + const res = await streamRead(offset) return { - buffer, - size: buffer.byteLength + ...res, + value: new Uint8Array(res.value ?? []) } }, - write: async ( - offset: number, buffer: Uint8Array, - isHeader: boolean, isFlushing: boolean, - position: number, pts: number, duration: number + clearStream: () => clearStream(), + randomRead: async (_offset: number, bufferSize: number) => { + const offset = Number(_offset) + const buffer = await randomRead(offset, bufferSize) + return buffer + }, + write: async (buffer: Uint8Array) => { + const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) + newBuffer.set(writeBuffer) + newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) + writeBuffer = newBuffer + }, + flush: async ( + _offset: number, _position: number, + pts: number, duration: number ) => { - if (isFlushing && currentBuffer.byteLength > 0) { - write({ - isHeader, - offset, - arrayBuffer: currentBuffer.buffer, - position, - pts, - duration - }) - currentBuffer = new Uint8Array(0) - if (isHeader) return - } - - const newBuffer = new Uint8Array(currentBuffer.byteLength + buffer.byteLength) - newBuffer.set(currentBuffer) - newBuffer.set(new Uint8Array(buffer), currentBuffer.byteLength) - currentBuffer = newBuffer + const offset = Number(_offset) + const position = Number(_position) + if (!writeBuffer.byteLength) return true + readResultPromiseResolve({ + isHeader: false, + offset, + buffer: writeBuffer.buffer as Uint8Array, + pos: position, + pts, + duration + }) + writeBuffer = new Uint8Array(0) + return false } }) - let transmuxer: ReturnType = makeTransmuxer() + let remuxer: ReturnType = makeRemuxer() - let firstInit = true return { init: async () => { - currentOffset = 0 - currentBuffer = new Uint8Array(0) + writeBuffer = new Uint8Array(0) module = await makeModule(publicPath) - transmuxer = makeTransmuxer() - await transmuxer.init(firstInit) - if (firstInit) firstInit = false + remuxer = makeRemuxer() + + readResultPromise = new Promise((resolve, reject) => { + readResultPromiseResolve = resolve + readResultPromiseReject = reject + }) + remuxer.init() + return readResultPromise }, destroy: () => { - transmuxer.destroy() - transmuxer = undefined + remuxer.destroy() + remuxer = undefined module = undefined - currentOffset = 0 - currentBuffer = new Uint8Array(0) + writeBuffer = new Uint8Array(0) + }, + seek: (timestamp: number) => remuxer.seek(timestamp), + read: () => { + readResultPromise = new Promise((resolve, reject) => { + readResultPromiseResolve = resolve + readResultPromiseReject = reject + }) + remuxer.read() + return readResultPromise }, - seek: (timestamp: number, flags: SEEK_FLAG) => transmuxer.seek(timestamp, flags), - // todo: For some reason transmuxer was undefined on firefox after a pretty normal seek(not fast seeking or anything), refactor this to prevent issues like this - process: (timeToProcess: number) => transmuxer.process(timeToProcess), - getInfo: () => transmuxer.getInfo() + getInfo: () => remuxer.getInfo() } }) diff --git a/vite.config.ts b/vite.config.ts index 0cbfe51..9b8a131 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -20,6 +20,20 @@ export default defineConfig((env) => ({ env.mode === 'development' ? [] : [commonjs()] - ) - ] + ), + { + name: 'configure-response-headers', + configureServer: (server) => { + server.middlewares.use((_req, res, next) => { + res.setHeader('Cache-Control', 'no-store') + next() + }) + } + } + ], + server: { + fs: { + allow: ['../..'] + } + } }))