From 7152e6a23a0751cc92c99a48f2cdfc5d6dc9aef3 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 21 Jan 2024 01:35:04 +0100 Subject: [PATCH 01/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fstart=20big?= =?UTF-8?q?=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 3 + src/main.cpp | 80 ++++++----- src/test.ts | 337 ++++++-------------------------------------- src/utils.ts | 71 ++++++++-- src/worker/index.ts | 31 ++-- 5 files changed, 171 insertions(+), 351 deletions(-) diff --git a/Makefile b/Makefile index 310cf6b..e13a129 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,9 @@ dist/libav-wasm.js: -s STACK_SIZE=50mb \ -s ASYNCIFY \ -s MODULARIZE=1 \ + -g \ + -gsource-map \ + --source-map-base http://localhost:1234/dist/ \ -s ASSERTIONS=2 \ -lavcodec -lavformat -lavfilter -lavdevice -lswresample -lswscale -lavutil -lm -lx264 \ -o dist/libav.js \ diff --git a/src/main.cpp b/src/main.cpp index 47f0807..7ddea72 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -35,7 +35,7 @@ extern "C" { static int readOutputFunction(void* opaque, uint8_t* buf, int buf_size); static int64_t seekFunction(void* opaque, int64_t offset, int whence); - class Transmuxer { + class Remuxer { public: AVIOContext* input_avio_context; AVIOContext* output_avio_context; @@ -63,13 +63,18 @@ extern "C" { std::string video_mime_type; std::string audio_mime_type; - val read = val::undefined(); + val randomRead = val::undefined(); + + val streamRead = val::undefined(); + val currentReadStream = val::undefined(); + val attachment = val::undefined(); val subtitle = val::undefined(); val write = val::undefined(); - val seek = val::undefined(); val error = val::undefined(); + double currentOffset = 0; + bool first_init = true; bool initializing = false; @@ -88,15 +93,15 @@ extern "C" { printf("\n"); } - Transmuxer(val options) { + Remuxer(val options) { promise = options["promise"]; input_length = options["length"].as(); buffer_size = options["bufferSize"].as(); - read = options["read"]; + randomRead = options["randomRead"]; + streamRead = options["streamRead"]; attachment = options["attachment"]; subtitle = options["subtitle"]; write = options["write"]; - seek = options["seek"]; error = options["error"]; } @@ -209,7 +214,7 @@ extern "C" { return mime_type; } - void init (bool _first_init) { + void init () { initializing = true; init_buffer_count = 0; int res; @@ -232,7 +237,7 @@ extern "C" { buffer_size, 0, reinterpret_cast(this), - &readFunction, + &randomReadFunction, nullptr, &seekFunction ); @@ -576,7 +581,7 @@ extern "C" { int _seek(int timestamp, int flags) { destroy(); - init(false); + init(); process(0.01); @@ -618,11 +623,19 @@ extern "C" { // Seek callback called by AVIOContext static int64_t seekFunction(void* opaque, int64_t offset, int whence) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); - emscripten::val &seek = remuxObject.seek; - // call the JS seek function - double result = seek(static_cast(offset), whence).await().as(); - return result; + Remuxer &remuxObject = *reinterpret_cast(opaque); + if (whence === SEEK_WHENCE_FLAG.SEEK_CUR) { + return remuxObject.currentOffset + offset + } + if (whence === SEEK_WHENCE_FLAG.SEEK_END) { + return -1 + } + if (whence === SEEK_WHENCE_FLAG.SEEK_SET) { + return offset + } + if (whence === SEEK_WHENCE_FLAG.AVSEEK_SIZE) { + return remuxObject.input_length + } } // If emscripten asynchify ever start working for libraries callbacks, @@ -630,13 +643,12 @@ extern "C" { // Read callback called by AVIOContext static int readFunction(void* opaque, uint8_t* buf, int buf_size) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); - emscripten::val &read = remuxObject.read; + Remuxer &remuxObject = *reinterpret_cast(opaque); std::string buffer; if (remuxObject.initializing) { + emscripten::val &randomRead = remuxObject.randomRead; if (remuxObject.first_init) { - val res = read(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await(); - buffer = res["buffer"].as(); + buffer = randomRead(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await().as(); remuxObject.init_buffers.push_back(buffer); } else { remuxObject.promise.await(); @@ -644,13 +656,16 @@ extern "C" { remuxObject.init_buffer_count++; } } else { - // call the JS read function and get its result as - // { - // buffer: Uint8Array, - // size: int - // } - val res = read(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await(); - buffer = res["buffer"].as(); + emscripten::val &streamRead = remuxObject.streamRead; + if (!remuxObject.currentReadStream) { + remuxObject.currentReadStream = streamRead(static_cast(remuxObject.input_format_context->pb->pos)).await(); + } + emscripten::val result = remuxObject.currentReadStream["read"]().await(); + bool is_done = result["done"].as(); + if (is_done) { + return AVERROR_EOF + } + buffer = result["value"].as(); } int buffer_size = buffer.size(); @@ -659,13 +674,14 @@ extern "C" { } // copy the result buffer into AVIO's buffer memcpy(buf, (uint8_t*)buffer.c_str(), buffer_size); + remuxObject.currentOffset = remuxObject.currentOffset + buffer.byteLength // If result buffer size is 0, we reached the end of the file return buffer_size; } // Write callback called by AVIOContext static int writeFunction(void* opaque, uint8_t* buf, int buf_size) { - Transmuxer &remuxObject = *reinterpret_cast(opaque); + Remuxer &remuxObject = *reinterpret_cast(opaque); if (remuxObject.initializing && !remuxObject.first_init) { return buf_size; @@ -710,12 +726,12 @@ extern "C" { .field("input", &InfoObject::input) .field("output", &InfoObject::output); - class_("Transmuxer") + class_("Remuxer") .constructor() - .function("init", &Transmuxer::init) - .function("process", &Transmuxer::process) - .function("destroy", &Transmuxer::destroy) - .function("seek", &Transmuxer::_seek) - .function("getInfo", &Transmuxer::getInfo); + .function("init", &Remuxer::init) + .function("process", &Remuxer::process) + .function("destroy", &Remuxer::destroy) + .function("seek", &Remuxer::_seek) + .function("getInfo", &Remuxer::getInfo); } } diff --git a/src/test.ts b/src/test.ts index 9ada331..5731c1e 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,7 +1,7 @@ // @ts-ignore import PQueue from 'p-queue' -import { SEEK_WHENCE_FLAG, queuedDebounceWithLastCall } from './utils' +import { SEEK_WHENCE_FLAG, queuedDebounceWithLastCall, toBufferedStream, toStreamChunkSize } from './utils' import { makeTransmuxer } from '.' type Chunk = { @@ -16,7 +16,7 @@ const BUFFER_SIZE = 5_000_000 const VIDEO_URL = '../video5.mkv' // const VIDEO_URL = '../spidey.mkv' const PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS = 10 -const POST_SEEK_NEEDED_BUFFERS_IN_SECONDS = 30 +const POST_SEEK_NEEDED_BUFFERS_IN_SECONDS = 15 const POST_SEEK_REMOVE_BUFFERS_IN_SECONDS = 60 export default async function saveFile(plaintext: ArrayBuffer, fileName: string, fileType: string) { @@ -64,6 +64,8 @@ export default async function saveFile(plaintext: ArrayBuffer, fileName: string, } + + fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) .then(async ({ headers, body }) => { if (!body) throw new Error('no body') @@ -75,7 +77,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) let headerChunk: Chunk let ended = false - let chunks: Chunk[] = [] const workerUrl2 = new URL('../build/worker.js', import.meta.url).toString() const blob = new Blob([`importScripts(${JSON.stringify(workerUrl2)})`], { type: 'application/javascript' }) @@ -86,39 +87,30 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) workerUrl, bufferSize: BUFFER_SIZE, length: contentLength, - read: (offset, size) => - // console.log('read', offset, size) || - offset >= Math.min(offset + size, contentLength) - ? Promise.resolve(new ArrayBuffer(0)) - : ( - fetch( - VIDEO_URL, - { - headers: { - Range: `bytes=${offset}-${Math.min(offset + size, contentLength) - 1}` - } - } + getStream: (offset, size) => + fetch( + VIDEO_URL, + { + headers: { + Range: `bytes=${offset}-${Math.min(offset + size, contentLength) - 1}` + } + } + ).then(res => res.arrayBuffer()), + readStream: (offset) => + fetch( + VIDEO_URL, + { + headers: { + Range: `bytes=${offset}` + } + } + ).then(res => + toBufferedStream(3)( + toStreamChunkSize(BUFFER_SIZE)( + res.body! ) - .then(res => res.arrayBuffer()) - ), - seek: async (currentOffset, offset, whence) => { - // console.log('seek', { currentOffset, offset, whence }) - if (whence === SEEK_WHENCE_FLAG.SEEK_CUR) { - return currentOffset + offset - } - if (whence === SEEK_WHENCE_FLAG.SEEK_END) { - return -1 - } - if (whence === SEEK_WHENCE_FLAG.SEEK_SET) { - // little trick to prevent libav from requesting end of file data on init that might take a while to fetch - // if (!initDone && offset > (contentLength - 1_000_000)) return -1 - return offset - } - if (whence === SEEK_WHENCE_FLAG.AVSEEK_SIZE) { - return contentLength - } - return -1 - }, + ) + ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) }, @@ -126,23 +118,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // console.log('attachment', filename, mimetype, buffer) }, write: ({ isHeader, offset, buffer, pts, duration: chunkDuration, pos }) => { - // console.log('write', { isHeader, offset, buffer, pts, duration: chunkDuration, pos }) - if (offset === contentLength) { - const lastChunk = chunks.at(-1) - if (!lastChunk) throw new Error('No last chunk found') - chunks = [ - ...chunks, - { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - ] - ended = true - return - } if (isHeader) { if (!headerChunk) { headerChunk = { @@ -155,38 +130,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } return } - chunks = [ - ...chunks, - { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - ] - // console.log('chunks', chunks) - // if (chunks.length === 2) { - // const buffer = - // chunks - // .map(({ buffer }) => buffer) - // .reduce( - // (acc, buffer) => { - // const ab = new Uint8Array(acc.byteLength + buffer.byteLength) - // ab.set(new Uint8Array(acc), 0) - // ab.set(new Uint8Array(buffer), acc.byteLength) - // return ab - // }, - // headerChunk.buffer - // ) - // .buffer - // console.log('buffer', buffer) - // saveFile( - // buffer, - // 'test.mp4', - // 'video/mp4' - // ) - // } } }) @@ -200,7 +143,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await transmuxer.init() - // @ts-ignore + // @ts-expect-error if (!headerChunk) throw new Error('No header chunk found after transmuxer init') const mediaInfo = await transmuxer.getInfo() @@ -238,25 +181,21 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) 'waiting' ] - // for (const event of allVideoEvents) { - // video.addEventListener(event, ev => { - // console.log('video event', event, ev) - // }) - // } + for (const event of allVideoEvents) { + video.addEventListener(event, ev => { + console.log('video event', event, ev) + }) + } const seconds = document.createElement('div') video.controls = true video.volume = 0 video.addEventListener('error', ev => { - // @ts-ignore + // @ts-expect-error console.error(ev.target?.error) }) document.body.appendChild(video) document.body.appendChild(seconds) - - setInterval(() => { - seconds.textContent = video.currentTime.toString() - }, 100) const mediaSource = new MediaSource() video.src = URL.createObjectURL(mediaSource) @@ -309,17 +248,13 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) }) ) - // const bufferChunk = (chunk: Chunk) => appendBuffer(chunk.buffer.buffer) - const bufferChunk = async (chunk: Chunk) => { - // console.log('bufferChunk', getTimeRanges().map(range => `${range.start}-${range.end}`).join(' '), chunk) - try { - await appendBuffer(chunk.buffer.buffer) - } catch (err) { - console.error(err) - throw err - } - // console.log('bufferedChunk', getTimeRanges().map(range => `${range.start}-${range.end}`).join(' ')) - } + const unbufferRange = async (start: number, end: number) => + queue.add(() => + new Promise((resolve, reject) => { + setupListeners(resolve, reject) + sourceBuffer.remove(start, end) + }) + ) const getTimeRanges = () => Array(sourceBuffer.buffered.length) @@ -330,182 +265,8 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) end: sourceBuffer.buffered.end(index) })) - const getTimeRange = (time: number) => - getTimeRanges() - .find(({ start, end }) => time >= start && time <= end) - - const unbufferRange = async (start: number, end: number) => - queue.add(() => - new Promise((resolve, reject) => { - setupListeners(resolve, reject) - sourceBuffer.remove(start, end) - }) - ) - - const unbufferChunk = (chunk: Chunk) => unbufferRange(chunk.pts, chunk.pts + chunk.duration) - - const removeChunk = async (chunk: Chunk) => { - const chunkIndex = chunks.indexOf(chunk) - if (chunkIndex === -1) throw new RangeError('No chunk found') - await unbufferChunk(chunk) - // chunks = chunks.filter(_chunk => _chunk !== chunk) - } - - // todo: add error checker & retry to seek a bit earlier - const seek = queuedDebounceWithLastCall(500, async (time: number) => { - ended = false - const ranges = getTimeRanges() - if (ranges.some(({ start, end }) => time >= start && time <= end)) { - return - } - const allTasksDone = new Promise(resolve => { - processingQueue.size && processingQueue.pending - ? ( - processingQueue.on( - 'next', - () => - processingQueue.pending === 0 - ? resolve(undefined) - : undefined - ) - ) - : resolve(undefined) - }) - processingQueue.pause() - processingQueue.clear() - await allTasksDone - processingQueue.start() - - const seekTime = Math.max(0, time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) - await transmuxer.seek(seekTime) - await process(POST_SEEK_NEEDED_BUFFERS_IN_SECONDS + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) - for (const range of ranges) { - await unbufferRange(range.start, range.end) - } - for (const chunk of chunks) { - if (chunk.pts <= seekTime) continue - await bufferChunk(chunk) - } - video.currentTime = time - }) - - const updateBufferedRanges = async (time: number) => { - const ranges1 = getTimeRanges() - // console.log('updateBufferedRanges', ranges1, chunks) - const neededChunks = - chunks - .filter(({ pts, duration }) => - ((time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) < pts) - && ((time + POST_SEEK_REMOVE_BUFFERS_IN_SECONDS) > (pts + duration)) - ) - - const shouldBeBufferedChunks = - neededChunks - .filter(({ pts, duration }) => - ((time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS) < pts) - && ((time + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) > (pts + duration)) - ) - - const shouldBeUnbufferedChunks = - chunks - .filter(({ pts, duration }) => ranges1.some(({ start, end }) => start < (pts + (duration / 2)) && (pts + (duration / 2)) < end)) - .filter((chunk) => !shouldBeBufferedChunks.includes(chunk)) - - const nonNeededChunks = - chunks - .filter((chunk) => !neededChunks.includes(chunk)) - - for (const shouldBeUnbufferedChunk of shouldBeUnbufferedChunks) { - await unbufferChunk(shouldBeUnbufferedChunk) - } - for (const nonNeededChunk of nonNeededChunks) { - await removeChunk(nonNeededChunk) - } - const firstChunk = neededChunks.sort(({ pts }, { pts: pts2 }) => pts - pts2).at(0) - const lastChunk = neededChunks.sort(({ pts, duration }, { pts: pts2, duration: duration2 }) => (pts + duration) - (pts2 + duration2)).at(-1) - - // if (firstChunk && lastChunk) { - // console.log('firstChunk & lastChunk', firstChunk.pts, lastChunk.pts + lastChunk.duration) - // sourceBuffer.appendWindowStart = Math.max(time - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS, 0) - // sourceBuffer.appendWindowEnd = Math.min(time + PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS, duration) - // } else { - // sourceBuffer.appendWindowStart = 0 - // sourceBuffer.appendWindowEnd = Infinity - // } - // console.log('shouldBeBufferedChunks', shouldBeBufferedChunks) - for (const chunk of shouldBeBufferedChunks) { - // if (chunk.buffered) continue - try { - // console.log('RANGES', getTimeRanges().map(({ start, end }) => `${start} - ${end}`)) - await bufferChunk(chunk) - // console.log('RANGES 2', getTimeRanges().map(({ start, end }) => `${start} - ${end}`)) - } catch (err) { - console.error(err) - if (!(err instanceof Event)) throw err - break - } - } - - const lowestAllowedStart = - firstChunk - ? Math.max(firstChunk?.pts - PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS, 0) - : undefined - const highestAllowedEnd = - lastChunk - ? Math.min(lastChunk.pts + lastChunk.duration + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS, duration) - : undefined - const ranges = getTimeRanges() - // console.log('ranges', ranges, lowestAllowedStart, highestAllowedEnd, neededChunks, chunks) - for (const { start, end } of ranges) { - if (!lowestAllowedStart || !highestAllowedEnd) continue - // console.log('range', start, end) - if (lowestAllowedStart !== undefined && start < lowestAllowedStart) { - await unbufferRange(start, lowestAllowedStart) - } - if (highestAllowedEnd !== undefined && end > highestAllowedEnd) { - await unbufferRange(highestAllowedEnd, end) - } - } - } - - await appendBuffer(headerChunk.buffer) - await new Promise(resolve => { - video.addEventListener('loadedmetadata', () => resolve(undefined), { once: true }) - }) - - // console.log('ranges', getTimeRanges()) - // console.groupCollapsed('process 30') - await process(30) - // console.groupEnd() - await updateBufferedRanges(0) - // console.log('ranges', getTimeRanges()) - - // console.groupCollapsed('process 10') - // await process(10) - // console.groupEnd() - - // console.groupCollapsed('seek 30') - // await transmuxer.seek(30) - // console.groupEnd() - // console.groupCollapsed('process 10') - // await process(10) - // console.groupEnd() - // await updateBufferedRanges(0) - - // setInterval(() => { - // console.log('ranges', getTimeRanges()) - // }, 5_000) - - const timeUpdateWork = queuedDebounceWithLastCall(500, async (time: number) => { - const lastChunk = chunks.sort(({ pts }, { pts: pts2 }) => pts - pts2).at(-1) - if (lastChunk && lastChunk.pts < time + POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) { - await process() - } - await updateBufferedRanges(time) - }) - video.addEventListener('timeupdate', () => { - timeUpdateWork(video.currentTime) + seconds.textContent = video.currentTime.toString() }) video.addEventListener('canplaythrough', () => { @@ -514,18 +275,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) }, { once: true }) video.addEventListener('seeking', () => { - // console.log('SEEKING', video.currentTime) - seek(video.currentTime) - }) - - await new Promise(resolve => setTimeout(resolve, 1000)) - - video.pause() - await seek(1370) - video.play() - - // video.currentTime = 1360 - // await new Promise(resolve => setTimeout(resolve, 1000)) - video.playbackRate = 5 + }) }) diff --git a/src/utils.ts b/src/utils.ts index c7cf8e0..2431137 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -25,6 +25,11 @@ export enum SEEK_WHENCE_FLAG { AVSEEK_SIZE = 1 << 16 //0x10000, } +export enum AVERROR { + EOF = -541478725, + EXIT = 255 +} + export const notifyInterface = (sharedArrayBuffer: SharedArrayBuffer, value: State) => { const int32Array = new Int32Array(sharedArrayBuffer) int32Array.set([value], 0) @@ -128,25 +133,23 @@ export const queuedDebounceWithLastCall = +// todo: reimplement this into a ReadableByteStream https://web.dev/streams/ once Safari gets support +export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => new ReadableStream({ + reader: undefined, + leftOverData: undefined, start() { - // @ts-ignore this.reader = stream.getReader() }, async pull(controller) { - // @ts-ignore const { leftOverData }: { leftOverData: Uint8Array | undefined } = this const accumulate = async ({ buffer = new Uint8Array(SIZE), currentSize = 0 } = {}): Promise<{ buffer?: Uint8Array, currentSize?: number, done: boolean }> => { - // @ts-ignore - const { value: newBuffer, done } = await this.reader.read() + const { value: newBuffer, done } = await this.reader!.read() if (currentSize === 0 && leftOverData) { buffer.set(leftOverData) currentSize += leftOverData.byteLength - // @ts-ignore this.leftOverData = undefined } @@ -160,7 +163,6 @@ export const bufferStream = ({ stream, size: SIZE }: { stream: ReadableStream, s buffer.set(slicedBuffer, currentSize) if (newSize === SIZE) { - // @ts-ignore this.leftOverData = newBuffer.slice(SIZE - currentSize) return { buffer, currentSize: newSize, done: false } } @@ -170,5 +172,58 @@ export const bufferStream = ({ stream, size: SIZE }: { stream: ReadableStream, s const { buffer, done } = await accumulate() if (buffer?.byteLength) controller.enqueue(buffer) if (done) controller.close() + }, + cancel() { + this.reader!.cancel() + } + } as UnderlyingDefaultSource & { reader: ReadableStreamDefaultReader | undefined, leftOverData: Uint8Array | undefined }) + +export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => + new ReadableStream({ + buffers: [], + currentPullPromise: undefined, + reader: undefined, + leftOverData: undefined, + start() { + this.reader = stream.getReader() + }, + async pull(controller) { + const pull = async () => { + if (await this.reader!.closed) return + if (this.buffers.length >= SIZE) return + this.currentPullPromise = this.reader!.read() + const { value: newBuffer, done } = await this.currentPullPromise + this.currentPullPromise = undefined + if (done) { + controller.close() + return + } + this.buffers.push(newBuffer) + return newBuffer + } + + const tryToBuffer = async (): Promise => { + if (this.buffers.length >= SIZE) return + + if (this.buffers.length === 0) { + await pull() + } else { + pull() + } + + return tryToBuffer() + } + + await tryToBuffer() + controller.enqueue(this.buffers.shift()) + tryToBuffer() + }, + cancel() { + this.reader!.cancel() } + } as UnderlyingDefaultSource & { + reader: ReadableStreamDefaultReader | undefined + leftOverData: Uint8Array | undefined + buffers: Uint8Array[] + currentPullPromise: Promise> | undefined }) diff --git a/src/worker/index.ts b/src/worker/index.ts index 868097e..ad897be 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -15,8 +15,8 @@ const makeModule = (publicPath: string) => let module: ReturnType -// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple transmuxer already initialized waiting to seek + process -// todo: We can keep in memory all of the chunks needed to initialize the transmuxer +// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple remuxer already initialized waiting to seek + process +// todo: We can keep in memory all of the chunks needed to initialize the remuxer // @ts-ignore const init = makeCallListener(async ( @@ -25,7 +25,8 @@ const init = makeCallListener(async ( publicPath: string length: number bufferSize: number - read: (offset: number, size: number) => Promise + randomRead: (offset: number, size: number) => Promise + getStream: (offset: number) => Promise> write: (params: { offset: number, arrayBuffer: ArrayBuffer, isHeader: boolean, position: number, pts: number, duration: number @@ -52,14 +53,10 @@ const init = makeCallListener(async ( const arraybuffer = uint8.buffer.slice(uint8.byteOffset, uint8.byteOffset + uint8.byteLength) attachment(filename, mimetype, arraybuffer) }, - seek: async (offset: number, whence: SEEK_WHENCE_FLAG) => seek(currentOffset, offset, whence), read: async (offset: number, bufferSize: number) => { const buffer = await read(offset, bufferSize) currentOffset = offset + buffer.byteLength - return { - buffer, - size: buffer.byteLength - } + return buffer }, write: async ( offset: number, buffer: Uint8Array, @@ -86,7 +83,7 @@ const init = makeCallListener(async ( } }) - let transmuxer: ReturnType = makeTransmuxer() + let remuxer: ReturnType = makeTransmuxer() let firstInit = true return { @@ -94,21 +91,21 @@ const init = makeCallListener(async ( currentOffset = 0 currentBuffer = new Uint8Array(0) module = await makeModule(publicPath) - transmuxer = makeTransmuxer() - await transmuxer.init(firstInit) + remuxer = makeTransmuxer() + await remuxer.init() if (firstInit) firstInit = false }, destroy: () => { - transmuxer.destroy() - transmuxer = undefined + remuxer.destroy() + remuxer = undefined module = undefined currentOffset = 0 currentBuffer = new Uint8Array(0) }, - seek: (timestamp: number, flags: SEEK_FLAG) => transmuxer.seek(timestamp, flags), - // todo: For some reason transmuxer was undefined on firefox after a pretty normal seek(not fast seeking or anything), refactor this to prevent issues like this - process: (timeToProcess: number) => transmuxer.process(timeToProcess), - getInfo: () => transmuxer.getInfo() + seek: (timestamp: number, flags: SEEK_FLAG) => remuxer.seek(timestamp, flags), + // todo: For some reason remuxer was undefined on firefox after a pretty normal seek(not fast seeking or anything), refactor this to prevent issues like this + process: (timeToProcess: number) => remuxer.process(timeToProcess), + getInfo: () => remuxer.getInfo() } }) From 1c43051b84e2fe3481236ab5f7ebde78986a0e36 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 21 Jan 2024 16:34:27 +0100 Subject: [PATCH 02/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20up=20?= =?UTF-8?q?issues=20&=20continue=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 8 +++--- package.json | 5 ++-- src/index.ts | 12 ++++---- src/main.cpp | 29 +++++++++++-------- src/test.ts | 4 +-- src/worker/index.ts | 69 +++++++++++++++++++++++++++++---------------- 6 files changed, 78 insertions(+), 49 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1b65b32..5a8a946 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "@rollup/plugin-commonjs": "^24.0.1", "concurrently": "^7.6.0", "copyfiles": "^2.4.1", - "nodemon": "^2.0.20", + "nodemon": "^2.0.22", "shx": "^0.3.4", "typescript": "^4.9.4", "vite": "^4.0.4" @@ -820,9 +820,9 @@ } }, "node_modules/nodemon": { - "version": "2.0.20", - "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-2.0.20.tgz", - "integrity": "sha512-Km2mWHKKY5GzRg6i1j5OxOHQtuvVsgskLfigG25yTtbyfRGn/GNvIbRyOf1PSCKJ2aT/58TiuUsuOU5UToVViw==", + "version": "2.0.22", + "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-2.0.22.tgz", + "integrity": "sha512-B8YqaKMmyuCO7BowF1Z1/mkPqLk6cs/l63Ojtd6otKjMx47Dq1utxfRxcavH1I7VSaL8n5BUaoutadnsX3AAVQ==", "dev": true, "dependencies": { "chokidar": "^3.5.2", diff --git a/package.json b/package.json index 68c69be..393c441 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,13 @@ "types": "build/index.d.ts", "type": "module", "scripts": { - "dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\"", + "dev": "npm i ./dist && concurrently \"npm run dev-wasm\" \"npm run dev-web\" \"npm run worker-build-dev\"", "dev-wasm": "nodemon -e cpp --exec \"npm run make-docker\"", "build-worker": "vite build --config vite-worker.config.ts", "make-docker": "docker-compose run libav-wasm make", "dev-web": "vite --port 1234", "vite-build": "vite build", + "worker-build-dev": "nodemon -e ts --watch src/worker --exec \"npm run build-worker\"", "build": "npm run copy-package && npm run make-docker && npm i ./dist && vite build && npm run build-worker && npm run types && npm run copy-wasm && npm remove libav", "types": "tsc", "copy-package": "copyfiles -u 2 ./src/build-config/package.json dist", @@ -24,7 +25,7 @@ "@rollup/plugin-commonjs": "^24.0.1", "concurrently": "^7.6.0", "copyfiles": "^2.4.1", - "nodemon": "^2.0.20", + "nodemon": "^2.0.22", "shx": "^0.3.4", "typescript": "^4.9.4", "vite": "^4.0.4" diff --git a/src/index.ts b/src/index.ts index 0758dc5..ee835c6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,8 +16,8 @@ export type MakeTransmuxerOptions = { /** Path that will be used to locate the javascript worker file */ workerUrl: string workerOptions?: WorkerOptions - read: (offset: number, size: number) => Promise - seek: (currentOffset: number, offset: number, whence: SEEK_WHENCE_FLAG) => Promise + randomRead: (offset: number, size: number) => Promise + getStream: (offset: number) => Promise> subtitle: (title: string, language: string, data: string) => Promise | void attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise | void write: (params: { @@ -77,8 +77,8 @@ export const makeTransmuxer = async ({ publicPath, workerUrl, workerOptions, - read: _read, - seek: _seek, + randomRead: _randomRead, + getStream: _getStream, write: _write, attachment, subtitle: _subtitle, @@ -155,8 +155,8 @@ export const makeTransmuxer = async ({ _subtitle(subtitle.title, subtitle.language, subtitleString) }, attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer), - read: (offset, bufferSize) => _read(offset, bufferSize), - seek: (currentOffset, offset, whence) => _seek(currentOffset, offset, whence), + randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize), + getStream: (offset) => _getStream(offset), write: async ({ isHeader, offset, diff --git a/src/main.cpp b/src/main.cpp index 7ddea72..4260957 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -237,7 +237,7 @@ extern "C" { buffer_size, 0, reinterpret_cast(this), - &randomReadFunction, + &readFunction, nullptr, &seekFunction ); @@ -624,18 +624,19 @@ extern "C" { // Seek callback called by AVIOContext static int64_t seekFunction(void* opaque, int64_t offset, int whence) { Remuxer &remuxObject = *reinterpret_cast(opaque); - if (whence === SEEK_WHENCE_FLAG.SEEK_CUR) { - return remuxObject.currentOffset + offset + if (whence == SEEK_CUR) { + return remuxObject.currentOffset + offset; } - if (whence === SEEK_WHENCE_FLAG.SEEK_END) { - return -1 + if (whence == SEEK_END) { + return -1; } - if (whence === SEEK_WHENCE_FLAG.SEEK_SET) { - return offset + if (whence == SEEK_SET) { + return offset; } - if (whence === SEEK_WHENCE_FLAG.AVSEEK_SIZE) { - return remuxObject.input_length + if (whence == AVSEEK_SIZE) { + return remuxObject.input_length; } + return -1; } // If emscripten asynchify ever start working for libraries callbacks, @@ -656,14 +657,20 @@ extern "C" { remuxObject.init_buffer_count++; } } else { + // emscripten::val &randomRead = remuxObject.randomRead; + // buffer = randomRead(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await().as(); emscripten::val &streamRead = remuxObject.streamRead; if (!remuxObject.currentReadStream) { remuxObject.currentReadStream = streamRead(static_cast(remuxObject.input_format_context->pb->pos)).await(); } + emscripten::val cancelled = remuxObject.currentReadStream["closed"].await(); + if (cancelled.as()) { + return AVERROR_EXIT; + } emscripten::val result = remuxObject.currentReadStream["read"]().await(); bool is_done = result["done"].as(); if (is_done) { - return AVERROR_EOF + return AVERROR_EOF; } buffer = result["value"].as(); } @@ -674,7 +681,7 @@ extern "C" { } // copy the result buffer into AVIO's buffer memcpy(buf, (uint8_t*)buffer.c_str(), buffer_size); - remuxObject.currentOffset = remuxObject.currentOffset + buffer.byteLength + remuxObject.currentOffset = remuxObject.currentOffset + buffer_size; // If result buffer size is 0, we reached the end of the file return buffer_size; } diff --git a/src/test.ts b/src/test.ts index 5731c1e..10be319 100644 --- a/src/test.ts +++ b/src/test.ts @@ -87,7 +87,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) workerUrl, bufferSize: BUFFER_SIZE, length: contentLength, - getStream: (offset, size) => + randomRead: (offset, size) => fetch( VIDEO_URL, { @@ -96,7 +96,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => res.arrayBuffer()), - readStream: (offset) => + getStream: (offset) => fetch( VIDEO_URL, { diff --git a/src/worker/index.ts b/src/worker/index.ts index ad897be..ba69548 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -15,12 +15,12 @@ const makeModule = (publicPath: string) => let module: ReturnType -// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple remuxer already initialized waiting to seek + process +// todo: if seek latency is too slow, because of destroy + init + seek + process, we can use multiple remuxer alrandomReady initialized waiting to seek + process // todo: We can keep in memory all of the chunks needed to initialize the remuxer // @ts-ignore const init = makeCallListener(async ( - { publicPath, length, bufferSize, read, seek, write, attachment, subtitle }: + { publicPath, length, bufferSize, randomRead, getStream, write, attachment, subtitle }: { publicPath: string length: number @@ -31,14 +31,17 @@ const init = makeCallListener(async ( offset: number, arrayBuffer: ArrayBuffer, isHeader: boolean, position: number, pts: number, duration: number }) => Promise | void - seek: (currentOffset: number, offset: number, whence: SEEK_WHENCE_FLAG) => Promise subtitle: (streamIndex: number, isHeader: boolean, data: string, ...rest: [number, number] | [string, string]) => Promise attachment: (filename: string, mimetype: string, buffer: ArrayBuffer) => Promise }) => { if (!module) module = await makeModule(publicPath) - let currentOffset = 0 - let currentBuffer = new Uint8Array(0) - const makeTransmuxer = () => new module.Transmuxer({ + + let writeBuffer = new Uint8Array(0) + + let currentStream: ReadableStream | undefined + let reader: ReadableStreamDefaultReader | undefined + + const makeRemuxer = () => new module.Remuxer({ promise: Promise.resolve(), length, bufferSize, @@ -53,9 +56,31 @@ const init = makeCallListener(async ( const arraybuffer = uint8.buffer.slice(uint8.byteOffset, uint8.byteOffset + uint8.byteLength) attachment(filename, mimetype, arraybuffer) }, - read: async (offset: number, bufferSize: number) => { - const buffer = await read(offset, bufferSize) - currentOffset = offset + buffer.byteLength + getStream: async (offset: number) => { + if (currentStream) { + await currentStream.cancel() + currentStream = undefined + reader = undefined + } + const stream = await getStream(offset) + currentStream = stream + reader = stream.getReader() + return { + read: async () => { + const { done, value } = await reader!.read() + if (done) return new Uint8Array(0) + return value + }, + closed: () => reader!.closed, + cancel: async () => { + await reader!.cancel() + currentStream = undefined + reader = undefined + } + } + }, + randomRead: async (offset: number, bufferSize: number) => { + const buffer = await randomRead(offset, bufferSize) return buffer }, write: async ( @@ -63,44 +88,40 @@ const init = makeCallListener(async ( isHeader: boolean, isFlushing: boolean, position: number, pts: number, duration: number ) => { - if (isFlushing && currentBuffer.byteLength > 0) { + if (isFlushing && writeBuffer.byteLength > 0) { write({ isHeader, offset, - arrayBuffer: currentBuffer.buffer, + arrayBuffer: writeBuffer.buffer, position, pts, duration }) - currentBuffer = new Uint8Array(0) + writeBuffer = new Uint8Array(0) if (isHeader) return } - const newBuffer = new Uint8Array(currentBuffer.byteLength + buffer.byteLength) - newBuffer.set(currentBuffer) - newBuffer.set(new Uint8Array(buffer), currentBuffer.byteLength) - currentBuffer = newBuffer + const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) + newBuffer.set(writeBuffer) + newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) + writeBuffer = newBuffer } }) - let remuxer: ReturnType = makeTransmuxer() + let remuxer: ReturnType = makeRemuxer() - let firstInit = true return { init: async () => { - currentOffset = 0 - currentBuffer = new Uint8Array(0) + writeBuffer = new Uint8Array(0) module = await makeModule(publicPath) - remuxer = makeTransmuxer() + remuxer = makeRemuxer() await remuxer.init() - if (firstInit) firstInit = false }, destroy: () => { remuxer.destroy() remuxer = undefined module = undefined - currentOffset = 0 - currentBuffer = new Uint8Array(0) + writeBuffer = new Uint8Array(0) }, seek: (timestamp: number, flags: SEEK_FLAG) => remuxer.seek(timestamp, flags), // todo: For some reason remuxer was undefined on firefox after a pretty normal seek(not fast seeking or anything), refactor this to prevent issues like this From 1bcee7e19dfa555eb024b81395a27c4005fed01e Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 21 Jan 2024 21:39:59 +0100 Subject: [PATCH 03/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fimprove=20w?= =?UTF-8?q?hole=20architecture?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 80 ++++++------------------------ src/main.cpp | 46 ++++++++--------- src/test.ts | 64 +++++++++++++----------- src/worker/index.ts | 118 +++++++++++++++++++++++++++++--------------- 4 files changed, 153 insertions(+), 155 deletions(-) diff --git a/src/index.ts b/src/index.ts index ee835c6..5f35c49 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,5 @@ import type { Resolvers as WorkerResolvers } from './worker' -import PQueue from 'p-queue' import { call } from 'osra' import { SEEK_FLAG, SEEK_WHENCE_FLAG } from './utils' @@ -99,15 +98,9 @@ export const makeTransmuxer = async ({ const target = call(worker) - const apiQueue = new PQueue() - - const addTask = any>(func: T) => - apiQueue.add>>(func) - const subtitles = new Map() - let lastChunk: Chunk | undefined - const { init: _workerInit, destroy: _workerDestroy, process: _workerProcess, seek: _workerSeek, getInfo: _getInfo } = + const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } = await target( 'init', { @@ -157,80 +150,35 @@ export const makeTransmuxer = async ({ attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer), randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize), getStream: (offset) => _getStream(offset), - write: async ({ + write: ({ isHeader, offset, arrayBuffer, position, pts, duration - }) => { - const chunk = { - isHeader, - offset, - buffer: new Uint8Array(arrayBuffer), - pts, - duration, - pos: position - } - - if (!isHeader) { - lastChunk = chunk - processBufferChunks.push(chunk) - } - - await _write(chunk) - } + }) => _write({ + isHeader, + offset, + buffer: new Uint8Array(arrayBuffer), + pts, + duration, + pos: position + }) } ) - const workerQueue = new PQueue({ concurrency: 1 }) - - const addWorkerTask = any>(func: T) => - (...args: Parameters) => - workerQueue.add>>(() => func(...args)) - - const workerInit = addWorkerTask(_workerInit) - const workerDestroy = addWorkerTask(_workerDestroy) - const workerProcess = addWorkerTask(_workerProcess) - const workerSeek = addWorkerTask(_workerSeek) - const getInfo = addWorkerTask(_getInfo) - - let processBufferChunks: Chunk[] = [] - const result = { - init: () => addTask(async () => { - processBufferChunks = [] - await workerInit() - }), + init: () => workerInit(), destroy: (destroyWorker = false) => { if (destroyWorker) { worker.terminate() return } - return addTask(() => workerDestroy()) - }, - process: (timeToProcess: number) => addTask(async () => { - processBufferChunks = [] - await workerProcess(timeToProcess) - const writtenChunks = processBufferChunks - processBufferChunks = [] - return writtenChunks - }), - seek: (time: number) => { - return addTask(async () => { - // if (lastChunk && (lastChunk.pts > time)) { - // await workerDestroy() - // processBufferChunks = [] - // await workerInit() - // } - processBufferChunks = [] - await workerSeek( - Math.max(0, time) * 1000, - SEEK_FLAG.NONE - ) - }) + return workerDestroy() }, + read: () => workerRead(), + seek: (time: number) => workerSeek(Math.max(0, time) * 1000, SEEK_FLAG.NONE), getInfo: () => getInfo() as Promise<{ input: MediaInfo, output: MediaInfo }> } diff --git a/src/main.cpp b/src/main.cpp index 4260957..f4b70ef 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -437,12 +437,12 @@ extern "C" { first_init = false; } - void process(double time_to_process) { + void read() { int res; - double start_process_pts = 0; + bool flushed = false; // loop through the packet frames until we reach the processed size - while (start_process_pts == 0 || (pts < (start_process_pts + time_to_process))) { + while (!flushed) { AVPacket* packet = av_packet_alloc(); if ((res = av_read_frame(input_format_context, packet)) < 0) { @@ -510,14 +510,8 @@ extern "C" { // Rescale the PTS/DTS from the input time base to the output time base av_packet_rescale_ts(packet, in_stream->time_base, out_stream->time_base); - if (!start_process_pts) { - start_process_pts = packet->pts * av_q2d(out_stream->time_base); - } - if (in_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - if (!start_process_pts) { - start_process_pts = packet->pts * av_q2d(out_stream->time_base); - } + // printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); duration += packet->duration * av_q2d(out_stream->time_base); } @@ -528,10 +522,12 @@ extern "C" { is_header = false; } else { is_flushing = true; + flushed = true; } prev_duration = duration; prev_pts = pts; + // printf("pts: %f, duration: %f\n", pts, duration); prev_pos = pos; duration = 0; @@ -583,7 +579,7 @@ extern "C" { destroy(); init(); - process(0.01); + read(); int res; prev_duration = 0; @@ -649,7 +645,13 @@ extern "C" { if (remuxObject.initializing) { emscripten::val &randomRead = remuxObject.randomRead; if (remuxObject.first_init) { - buffer = randomRead(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await().as(); + buffer = + randomRead( + static_cast(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); remuxObject.init_buffers.push_back(buffer); } else { remuxObject.promise.await(); @@ -657,17 +659,17 @@ extern "C" { remuxObject.init_buffer_count++; } } else { - // emscripten::val &randomRead = remuxObject.randomRead; - // buffer = randomRead(static_cast(remuxObject.input_format_context->pb->pos), buf_size).await().as(); - emscripten::val &streamRead = remuxObject.streamRead; - if (!remuxObject.currentReadStream) { - remuxObject.currentReadStream = streamRead(static_cast(remuxObject.input_format_context->pb->pos)).await(); - } - emscripten::val cancelled = remuxObject.currentReadStream["closed"].await(); - if (cancelled.as()) { + emscripten::val result = + remuxObject + .streamRead( + static_cast(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await(); + bool is_cancelled = result["cancelled"].as(); + if (is_cancelled) { return AVERROR_EXIT; } - emscripten::val result = remuxObject.currentReadStream["read"]().await(); bool is_done = result["done"].as(); if (is_done) { return AVERROR_EOF; @@ -736,7 +738,7 @@ extern "C" { class_("Remuxer") .constructor() .function("init", &Remuxer::init) - .function("process", &Remuxer::process) + .function("read", &Remuxer::read) .function("destroy", &Remuxer::destroy) .function("seek", &Remuxer::_seek) .function("getInfo", &Remuxer::getInfo); diff --git a/src/test.ts b/src/test.ts index 10be319..e971f72 100644 --- a/src/test.ts +++ b/src/test.ts @@ -12,7 +12,7 @@ type Chunk = { pos: number } -const BUFFER_SIZE = 5_000_000 +const BUFFER_SIZE = 2_500_000 const VIDEO_URL = '../video5.mkv' // const VIDEO_URL = '../spidey.mkv' const PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS = 10 @@ -82,7 +82,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) const blob = new Blob([`importScripts(${JSON.stringify(workerUrl2)})`], { type: 'application/javascript' }) const workerUrl = URL.createObjectURL(blob) - const transmuxer = await makeTransmuxer({ + const remuxer = await makeTransmuxer({ publicPath: new URL('/dist/', new URL(import.meta.url).origin).toString(), workerUrl, bufferSize: BUFFER_SIZE, @@ -96,21 +96,21 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => res.arrayBuffer()), - getStream: (offset) => - fetch( - VIDEO_URL, - { - headers: { - Range: `bytes=${offset}` + getStream: (offset) => + fetch( + VIDEO_URL, + { + headers: { + Range: `bytes=${offset}` + } } - } - ).then(res => - toBufferedStream(3)( - toStreamChunkSize(BUFFER_SIZE)( - res.body! - ) - ) - ), + ).then(res => + // toBufferedStream(3)( + toStreamChunkSize(BUFFER_SIZE)( + res.body! + ) + // ) + ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) }, @@ -133,20 +133,12 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } }) - const processingQueue = new PQueue({ concurrency: 1 }) - - const process = (timeToProcess = POST_SEEK_NEEDED_BUFFERS_IN_SECONDS) => - processingQueue.add( - () => ended ? Promise.resolve(undefined) : transmuxer.process(timeToProcess), - { throwOnTimeout: true } - ) - - await transmuxer.init() + await remuxer.init() // @ts-expect-error - if (!headerChunk) throw new Error('No header chunk found after transmuxer init') + if (!headerChunk) throw new Error('No header chunk found after remuxer init') - const mediaInfo = await transmuxer.getInfo() + const mediaInfo = await remuxer.getInfo() const duration = mediaInfo.input.duration / 1_000_000 const video = document.createElement('video') @@ -176,7 +168,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) 'seeking', 'stalled', 'suspend', - 'timeupdate', + // 'timeupdate', 'volumechange', 'waiting' ] @@ -277,4 +269,20 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) video.addEventListener('seeking', () => { }) + + const logAndAppend = async (chunk: Chunk) => { + console.log('res', chunk) + await appendBuffer(chunk.buffer) + } + + await appendBuffer(headerChunk.buffer) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + console.log('ranges', getTimeRanges()) + // console.log((await remuxer.read()).pts) }) diff --git a/src/worker/index.ts b/src/worker/index.ts index ba69548..e36fff9 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,4 +1,5 @@ import { makeCallListener, registerListener } from 'osra' +import type { Chunk } from '..' // @ts-ignore import WASMModule from 'libav' @@ -41,6 +42,14 @@ const init = makeCallListener(async ( let currentStream: ReadableStream | undefined let reader: ReadableStreamDefaultReader | undefined + let streamResultPromiseResolve: (value: ReadableStreamReadResult) => void + let streamResultPromiseReject: (reason?: any) => void + let streamResultPromise: Promise> + + let readResultPromiseResolve: (value: Chunk) => void + let readResultPromiseReject: (reason?: any) => void + let readResultPromise: Promise + const makeRemuxer = () => new module.Remuxer({ promise: Promise.resolve(), length, @@ -56,28 +65,37 @@ const init = makeCallListener(async ( const arraybuffer = uint8.buffer.slice(uint8.byteOffset, uint8.byteOffset + uint8.byteLength) attachment(filename, mimetype, arraybuffer) }, - getStream: async (offset: number) => { - if (currentStream) { - await currentStream.cancel() - currentStream = undefined - reader = undefined - } - const stream = await getStream(offset) - currentStream = stream - reader = stream.getReader() - return { - read: async () => { - const { done, value } = await reader!.read() - if (done) return new Uint8Array(0) - return value - }, - closed: () => reader!.closed, - cancel: async () => { - await reader!.cancel() - currentStream = undefined - reader = undefined - } + streamRead: async (offset: number) => { + // console.log('streamRead', offset) + if (!currentStream) { + currentStream = await getStream(offset) + reader = currentStream.getReader() } + + streamResultPromise = new Promise>((resolve, reject) => { + streamResultPromiseResolve = resolve + streamResultPromiseReject = reject + }) + + reader?.read() + .then((result) => streamResultPromiseResolve(result)) + .catch((err) => streamResultPromiseReject(err)) + + return ( + streamResultPromise + .then((value) => ({ + value: value.value, + done: value.done + })) + .catch(err => { + console.log('streamRead error', err) + return { + buffer: undefined, + done: false, + cancelled: true + } + }) + ) }, randomRead: async (offset: number, bufferSize: number) => { const buffer = await randomRead(offset, bufferSize) @@ -88,23 +106,39 @@ const init = makeCallListener(async ( isHeader: boolean, isFlushing: boolean, position: number, pts: number, duration: number ) => { - if (isFlushing && writeBuffer.byteLength > 0) { - write({ - isHeader, - offset, - arrayBuffer: writeBuffer.buffer, - position, - pts, - duration - }) - writeBuffer = new Uint8Array(0) - if (isHeader) return + if (isHeader) { + if (isFlushing && writeBuffer.byteLength > 0) { + write({ + isHeader, + offset, + arrayBuffer: writeBuffer.buffer, + position, + pts, + duration + }) + writeBuffer = new Uint8Array(0) + if (isHeader) return + } + + + const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) + newBuffer.set(writeBuffer) + newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) + writeBuffer = newBuffer + return } - - const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) - newBuffer.set(writeBuffer) - newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) - writeBuffer = newBuffer + console.log('write', isHeader, isFlushing, pts) + + const newBuffer = new Uint8Array(buffer.byteLength) + newBuffer.set(buffer) + readResultPromiseResolve({ + isHeader: false, + offset, + buffer: newBuffer.buffer as Uint8Array, + pos: position, + pts, + duration + }) } }) @@ -124,8 +158,14 @@ const init = makeCallListener(async ( writeBuffer = new Uint8Array(0) }, seek: (timestamp: number, flags: SEEK_FLAG) => remuxer.seek(timestamp, flags), - // todo: For some reason remuxer was undefined on firefox after a pretty normal seek(not fast seeking or anything), refactor this to prevent issues like this - process: (timeToProcess: number) => remuxer.process(timeToProcess), + read: () => { + readResultPromise = new Promise((resolve, reject) => { + readResultPromiseResolve = resolve + readResultPromiseReject = reject + }) + remuxer.read() + return readResultPromise + }, getInfo: () => remuxer.getInfo() } }) From bbfeb968dde36bb91ac614a3255ae6761c10de84 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 21 Jan 2024 21:59:46 +0100 Subject: [PATCH 04/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fimprove=20w?= =?UTF-8?q?rite=20&=20flush=20architecture?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 33 ++++++++++-------------------- src/test.ts | 34 ++++++++++++++++-------------- src/worker/index.ts | 50 +++++++++++++++++++-------------------------- 3 files changed, 51 insertions(+), 66 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index f4b70ef..a946aac 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -51,7 +51,6 @@ extern "C" { int video_stream_index; bool is_header; - bool is_flushing; long prev_pos; double prev_pts; double prev_duration; @@ -71,6 +70,7 @@ extern "C" { val attachment = val::undefined(); val subtitle = val::undefined(); val write = val::undefined(); + val flush = val::undefined(); val error = val::undefined(); double currentOffset = 0; @@ -102,6 +102,7 @@ extern "C" { attachment = options["attachment"]; subtitle = options["subtitle"]; write = options["write"]; + flush = options["flush"]; error = options["error"]; } @@ -422,11 +423,8 @@ extern "C" { return; } if (first_init) { - write( + flush( static_cast(input_format_context->pb->pos), - NULL, - is_header, - true, 0, 0, 0 @@ -448,13 +446,9 @@ extern "C" { if ((res = av_read_frame(input_format_context, packet)) < 0) { if (res == AVERROR_EOF) { avio_flush(output_format_context->pb); - is_flushing = true; av_write_trailer(output_format_context); - write( + flush( static_cast(input_format_context->pb->pos), - NULL, - is_header, - true, pos, pts, duration @@ -521,7 +515,12 @@ extern "C" { if (was_header) { is_header = false; } else { - is_flushing = true; + flush( + static_cast(input_format_context->pb->pos), + prev_pos, + prev_pts, + prev_duration + ).await(); flushed = true; } @@ -700,24 +699,14 @@ extern "C" { // call the JS write function write( - static_cast(remuxObject.input_format_context->pb->pos), emscripten::val( emscripten::typed_memory_view( buf_size, buf ) - ), - remuxObject.is_header, - remuxObject.is_flushing, - remuxObject.prev_pos, - remuxObject.prev_pts, - remuxObject.prev_duration + ) ).await(); - if (remuxObject.is_flushing) { - remuxObject.is_flushing = false; - } - return buf_size; } diff --git a/src/test.ts b/src/test.ts index e971f72..f8f7824 100644 --- a/src/test.ts +++ b/src/test.ts @@ -75,7 +75,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) ? Number(contentRangeContentLength) : Number(headers.get('Content-Length')) - let headerChunk: Chunk + // let headerChunk: Chunk let ended = false const workerUrl2 = new URL('../build/worker.js', import.meta.url).toString() @@ -118,24 +118,23 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // console.log('attachment', filename, mimetype, buffer) }, write: ({ isHeader, offset, buffer, pts, duration: chunkDuration, pos }) => { - if (isHeader) { - if (!headerChunk) { - headerChunk = { - offset, - buffer: new Uint8Array(buffer), - pts, - duration: chunkDuration, - pos - } - } - return - } + // if (isHeader) { + // if (!headerChunk) { + // headerChunk = { + // offset, + // buffer: new Uint8Array(buffer), + // pts, + // duration: chunkDuration, + // pos + // } + // } + // return + // } } }) - await remuxer.init() + const headerChunk = await remuxer.init() - // @ts-expect-error if (!headerChunk) throw new Error('No header chunk found after remuxer init') const mediaInfo = await remuxer.getInfo() @@ -276,6 +275,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } await appendBuffer(headerChunk.buffer) + + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) await logAndAppend((await remuxer.read())) await logAndAppend((await remuxer.read())) await logAndAppend((await remuxer.read())) diff --git a/src/worker/index.ts b/src/worker/index.ts index e36fff9..35f61b9 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -101,36 +101,21 @@ const init = makeCallListener(async ( const buffer = await randomRead(offset, bufferSize) return buffer }, - write: async ( - offset: number, buffer: Uint8Array, - isHeader: boolean, isFlushing: boolean, - position: number, pts: number, duration: number + write: async (buffer: Uint8Array) => { + console.log('buffer', buffer.byteLength) + console.log('writeBuffer', writeBuffer.byteLength) + const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) + newBuffer.set(writeBuffer) + newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) + writeBuffer = newBuffer + }, + flush: async ( + offset: number, position: number, + pts: number, duration: number ) => { - if (isHeader) { - if (isFlushing && writeBuffer.byteLength > 0) { - write({ - isHeader, - offset, - arrayBuffer: writeBuffer.buffer, - position, - pts, - duration - }) - writeBuffer = new Uint8Array(0) - if (isHeader) return - } - - - const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) - newBuffer.set(writeBuffer) - newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) - writeBuffer = newBuffer - return - } - console.log('write', isHeader, isFlushing, pts) - - const newBuffer = new Uint8Array(buffer.byteLength) - newBuffer.set(buffer) + console.log('flush', writeBuffer.byteLength) + const newBuffer = new Uint8Array(writeBuffer.byteLength) + newBuffer.set(writeBuffer) readResultPromiseResolve({ isHeader: false, offset, @@ -139,6 +124,7 @@ const init = makeCallListener(async ( pts, duration }) + writeBuffer = new Uint8Array(0) } }) @@ -149,7 +135,13 @@ const init = makeCallListener(async ( writeBuffer = new Uint8Array(0) module = await makeModule(publicPath) remuxer = makeRemuxer() + + readResultPromise = new Promise((resolve, reject) => { + readResultPromiseResolve = resolve + readResultPromiseReject = reject + }) await remuxer.init() + return readResultPromise }, destroy: () => { remuxer.destroy() From 8fb1339c0dca74fc35e8e5846c8d978f03ecbab5 Mon Sep 17 00:00:00 2001 From: Banou Date: Mon, 22 Jan 2024 00:11:32 +0100 Subject: [PATCH 05/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20first?= =?UTF-8?q?=20read?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 20 ++++++++++++++------ src/test.ts | 31 +++++++++++++++++++------------ src/utils.ts | 1 + src/worker/index.ts | 9 ++++----- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index a946aac..8d3699e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -51,6 +51,7 @@ extern "C" { int video_stream_index; bool is_header; + bool is_flushing; long prev_pos; double prev_pts; double prev_duration; @@ -429,6 +430,7 @@ extern "C" { 0, 0 ); + is_flushing = false; } initializing = false; @@ -446,6 +448,7 @@ extern "C" { if ((res = av_read_frame(input_format_context, packet)) < 0) { if (res == AVERROR_EOF) { avio_flush(output_format_context->pb); + is_flushing = true; av_write_trailer(output_format_context); flush( static_cast(input_format_context->pb->pos), @@ -515,12 +518,7 @@ extern "C" { if (was_header) { is_header = false; } else { - flush( - static_cast(input_format_context->pb->pos), - prev_pos, - prev_pts, - prev_duration - ).await(); + is_flushing = true; flushed = true; } @@ -541,6 +539,16 @@ extern "C" { continue; } + if (is_flushing) { + flush( + static_cast(input_format_context->pb->pos), + prev_pos, + prev_pts, + prev_duration + ).await(); + is_flushing = false; + } + // free packet av_packet_unref(packet); av_packet_free(&packet); diff --git a/src/test.ts b/src/test.ts index f8f7824..d42d6bb 100644 --- a/src/test.ts +++ b/src/test.ts @@ -101,15 +101,15 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) VIDEO_URL, { headers: { - Range: `bytes=${offset}` + Range: `bytes=${offset}-` } } ).then(res => - // toBufferedStream(3)( + toBufferedStream(3)( toStreamChunkSize(BUFFER_SIZE)( res.body! ) - // ) + ) ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) @@ -274,18 +274,25 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(chunk.buffer) } + const BUFFER_COUNT = 2 + await appendBuffer(headerChunk.buffer) + console.log('HEADER APPENDED') + await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + console.log('FIRST BUFFER APPENDED') + + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) console.log('ranges', getTimeRanges()) // console.log((await remuxer.read()).pts) diff --git a/src/utils.ts b/src/utils.ts index 2431137..1ed65b1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -145,6 +145,7 @@ export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => const { leftOverData }: { leftOverData: Uint8Array | undefined } = this const accumulate = async ({ buffer = new Uint8Array(SIZE), currentSize = 0 } = {}): Promise<{ buffer?: Uint8Array, currentSize?: number, done: boolean }> => { + console.log('accumulate') const { value: newBuffer, done } = await this.reader!.read() if (currentSize === 0 && leftOverData) { diff --git a/src/worker/index.ts b/src/worker/index.ts index 35f61b9..a9ccd65 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -66,7 +66,6 @@ const init = makeCallListener(async ( attachment(filename, mimetype, arraybuffer) }, streamRead: async (offset: number) => { - // console.log('streamRead', offset) if (!currentStream) { currentStream = await getStream(offset) reader = currentStream.getReader() @@ -102,8 +101,8 @@ const init = makeCallListener(async ( return buffer }, write: async (buffer: Uint8Array) => { - console.log('buffer', buffer.byteLength) - console.log('writeBuffer', writeBuffer.byteLength) + console.log('buffer', buffer) + console.log('writeBuffer', writeBuffer) const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -113,7 +112,7 @@ const init = makeCallListener(async ( offset: number, position: number, pts: number, duration: number ) => { - console.log('flush', writeBuffer.byteLength) + console.log('flush', writeBuffer) const newBuffer = new Uint8Array(writeBuffer.byteLength) newBuffer.set(writeBuffer) readResultPromiseResolve({ @@ -140,7 +139,7 @@ const init = makeCallListener(async ( readResultPromiseResolve = resolve readResultPromiseReject = reject }) - await remuxer.init() + remuxer.init() return readResultPromise }, destroy: () => { From 3af81094d92549966fe7fc4bd5377b6992132ed8 Mon Sep 17 00:00:00 2001 From: Banou Date: Mon, 22 Jan 2024 00:52:56 +0100 Subject: [PATCH 06/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fimprove=20b?= =?UTF-8?q?uffering=20&=20flushing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 6 ++++++ src/test.ts | 9 +++------ src/worker/index.ts | 5 +++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 8d3699e..1393c00 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -519,6 +519,12 @@ extern "C" { is_header = false; } else { is_flushing = true; + flush( + static_cast(input_format_context->pb->pos), + prev_pos, + prev_pts, + prev_duration + ).await(); flushed = true; } diff --git a/src/test.ts b/src/test.ts index d42d6bb..08fb647 100644 --- a/src/test.ts +++ b/src/test.ts @@ -278,14 +278,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) - console.log('HEADER APPENDED') await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - console.log('FIRST BUFFER APPENDED') - - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) + await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) diff --git a/src/worker/index.ts b/src/worker/index.ts index a9ccd65..3cb35d5 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -101,8 +101,8 @@ const init = makeCallListener(async ( return buffer }, write: async (buffer: Uint8Array) => { - console.log('buffer', buffer) - console.log('writeBuffer', writeBuffer) + // console.log('buffer', buffer) + // console.log('writeBuffer', writeBuffer) const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -112,6 +112,7 @@ const init = makeCallListener(async ( offset: number, position: number, pts: number, duration: number ) => { + if (!writeBuffer.byteLength) return console.log('flush', writeBuffer) const newBuffer = new Uint8Array(writeBuffer.byteLength) newBuffer.set(writeBuffer) From 3f7fc358dad12e7cba6ae5967de717e59787ef2e Mon Sep 17 00:00:00 2001 From: Banou Date: Mon, 22 Jan 2024 02:01:12 +0100 Subject: [PATCH 07/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fstart=20imp?= =?UTF-8?q?l=20update=20buffers=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test.ts | 59 ++++++++++++++++++++++++++++++++++++++++----- src/worker/index.ts | 5 ++-- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/test.ts b/src/test.ts index 08fb647..5133194 100644 --- a/src/test.ts +++ b/src/test.ts @@ -105,11 +105,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => - toBufferedStream(3)( + // toBufferedStream(3)( toStreamChunkSize(BUFFER_SIZE)( res.body! ) - ) + // ) ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) @@ -274,15 +274,62 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(chunk.buffer) } + let chunks: Chunk[] = [] + + const PREVIOUS_BUFFER_COUNT = 1 const BUFFER_COUNT = 2 await appendBuffer(headerChunk.buffer) + const pull = async () => { + const chunk = await remuxer.read() + chunks = [...chunks, chunk] + return chunk + } + + const updateBuffers = queuedDebounceWithLastCall(500, async () => { + const { currentTime } = video + const currentChunkIndex = chunks.findIndex(({ pts, duration }) => pts <= currentTime && pts + duration >= currentTime) + + // buffer needed chunks to be at BUFFER_COUNT + for (let i = 0; i < currentChunkIndex + BUFFER_COUNT; i++) { + if (chunks[i]) continue + const chunk = await pull() + await appendBuffer(chunk.buffer) + } + + // remove chunks before currentChunkIndex - PREVIOUS_BUFFER_COUNT + for (let i = 0; i < currentChunkIndex - PREVIOUS_BUFFER_COUNT; i++) { + if (!chunks[i]) continue + const { duration } = chunks[i] + await unbufferRange(0, duration) + chunks = chunks.filter((_, index) => index !== i) + } + + // for (let i = currentChunkIndex - PREVIOUS_BUFFER_COUNT; i >= 0; i--) { + // if (!chunks[i]) continue + // const { pts, duration } = chunks[i] + // const start = pts + // const end = pts + duration + // await unbufferRange(start, end) + // chunks = chunks.filter((_, index) => index !== i) + // } + + console.log('chunks', chunks) + }) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) - await logAndAppend((await remuxer.read())) + appendBuffer((await pull()).buffer) + + video.addEventListener('timeupdate', () => { + updateBuffers() + }) + + updateBuffers() + + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) + // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) diff --git a/src/worker/index.ts b/src/worker/index.ts index 3cb35d5..ea96357 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -76,9 +76,10 @@ const init = makeCallListener(async ( streamResultPromiseReject = reject }) + console.log('read') reader?.read() - .then((result) => streamResultPromiseResolve(result)) - .catch((err) => streamResultPromiseReject(err)) + .then((result) => console.log('read result', result) || streamResultPromiseResolve(result)) + .catch((err) => console.log('read err', err) || streamResultPromiseReject(err)) return ( streamResultPromise From b50e9a7785514b1f64aeed014f667d304d52cfe0 Mon Sep 17 00:00:00 2001 From: Banou Date: Mon, 22 Jan 2024 03:05:11 +0100 Subject: [PATCH 08/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20toBuf?= =?UTF-8?q?feredStream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test.ts | 6 +++--- src/utils.ts | 15 ++++++++++----- src/worker/index.ts | 5 +++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/test.ts b/src/test.ts index 5133194..e2ddded 100644 --- a/src/test.ts +++ b/src/test.ts @@ -105,11 +105,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => - // toBufferedStream(3)( + toBufferedStream(3)( toStreamChunkSize(BUFFER_SIZE)( res.body! ) - // ) + ) ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) @@ -315,7 +315,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // chunks = chunks.filter((_, index) => index !== i) // } - console.log('chunks', chunks) + // console.log('chunks', chunks) }) appendBuffer((await pull()).buffer) diff --git a/src/utils.ts b/src/utils.ts index 1ed65b1..cfa7d5c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -145,7 +145,6 @@ export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => const { leftOverData }: { leftOverData: Uint8Array | undefined } = this const accumulate = async ({ buffer = new Uint8Array(SIZE), currentSize = 0 } = {}): Promise<{ buffer?: Uint8Array, currentSize?: number, done: boolean }> => { - console.log('accumulate') const { value: newBuffer, done } = await this.reader!.read() if (currentSize === 0 && leftOverData) { @@ -185,12 +184,16 @@ export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => currentPullPromise: undefined, reader: undefined, leftOverData: undefined, + closed: false, start() { this.reader = stream.getReader() + this.reader.closed.then(() => { + this.closed = true + }) }, async pull(controller) { const pull = async () => { - if (await this.reader!.closed) return + if (this.closed) return if (this.buffers.length >= SIZE) return this.currentPullPromise = this.reader!.read() const { value: newBuffer, done } = await this.currentPullPromise @@ -208,11 +211,12 @@ export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => if (this.buffers.length === 0) { await pull() + return tryToBuffer() } else { - pull() + pull().then(() => { + tryToBuffer() + }) } - - return tryToBuffer() } await tryToBuffer() @@ -227,4 +231,5 @@ export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => leftOverData: Uint8Array | undefined buffers: Uint8Array[] currentPullPromise: Promise> | undefined + closed: boolean }) diff --git a/src/worker/index.ts b/src/worker/index.ts index ea96357..a838121 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -76,10 +76,11 @@ const init = makeCallListener(async ( streamResultPromiseReject = reject }) + const p = performance.now() console.log('read') reader?.read() - .then((result) => console.log('read result', result) || streamResultPromiseResolve(result)) - .catch((err) => console.log('read err', err) || streamResultPromiseReject(err)) + .then((result) => console.log('read result', result, performance.now() - p) || streamResultPromiseResolve(result)) + .catch((err) => console.log('read err', err, performance.now() - p) || streamResultPromiseReject(err)) return ( streamResultPromise From 0c2e5fdd832bec8274433a1e70be68ea2e69dec7 Mon Sep 17 00:00:00 2001 From: Banou Date: Mon, 22 Jan 2024 05:06:25 +0100 Subject: [PATCH 09/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Funbuffer=20?= =?UTF-8?q?past=20chunks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 4 ++++ src/test.ts | 45 ++++++++++++++++++++++++++++----------------- src/worker/index.ts | 16 ++++++++++------ 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 1393c00..1a29265 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -696,6 +696,10 @@ extern "C" { } // copy the result buffer into AVIO's buffer memcpy(buf, (uint8_t*)buffer.c_str(), buffer_size); + + // free the result buffer + buffer.clear(); + remuxObject.currentOffset = remuxObject.currentOffset + buffer_size; // If result buffer size is 0, we reached the end of the file return buffer_size; diff --git a/src/test.ts b/src/test.ts index e2ddded..c2e9ce8 100644 --- a/src/test.ts +++ b/src/test.ts @@ -291,31 +291,38 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) const { currentTime } = video const currentChunkIndex = chunks.findIndex(({ pts, duration }) => pts <= currentTime && pts + duration >= currentTime) - // buffer needed chunks to be at BUFFER_COUNT for (let i = 0; i < currentChunkIndex + BUFFER_COUNT; i++) { if (chunks[i]) continue const chunk = await pull() await appendBuffer(chunk.buffer) } - // remove chunks before currentChunkIndex - PREVIOUS_BUFFER_COUNT - for (let i = 0; i < currentChunkIndex - PREVIOUS_BUFFER_COUNT; i++) { - if (!chunks[i]) continue - const { duration } = chunks[i] - await unbufferRange(0, duration) - chunks = chunks.filter((_, index) => index !== i) + const sliceIndex = Math.max(0, currentChunkIndex - PREVIOUS_BUFFER_COUNT) + if (sliceIndex) chunks = chunks.slice(sliceIndex) + + const bufferedRanges = getTimeRanges() + + const firstChunk = chunks.at(0) + const lastChunk = chunks.at(-1) + if (!firstChunk || !lastChunk || firstChunk === lastChunk) return + const minTime = firstChunk.pts + + for (const { start, end } of bufferedRanges) { + const chunkIndex = chunks.findIndex(({ pts, duration }) => start <= (pts + (duration / 2)) && (pts + (duration / 2)) <= end) + if (chunkIndex === -1) { + await unbufferRange(start, end) + } else { + if (start < minTime) { + await unbufferRange( + start, + minTime + ) + } + } } - // for (let i = currentChunkIndex - PREVIOUS_BUFFER_COUNT; i >= 0; i--) { - // if (!chunks[i]) continue - // const { pts, duration } = chunks[i] - // const start = pts - // const end = pts + duration - // await unbufferRange(start, end) - // chunks = chunks.filter((_, index) => index !== i) - // } - - // console.log('chunks', chunks) + // console.log('chunks', ...chunks) + // console.log('ranges2', ...bufferedRanges) }) appendBuffer((await pull()).buffer) @@ -326,6 +333,10 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) updateBuffers() + setTimeout(() => { + video.playbackRate = 5 + }, 100) + // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) // await logAndAppend((await remuxer.read())) diff --git a/src/worker/index.ts b/src/worker/index.ts index a838121..2b1bd11 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -76,11 +76,15 @@ const init = makeCallListener(async ( streamResultPromiseReject = reject }) - const p = performance.now() - console.log('read') + // const p = performance.now() + // console.log('read') + // reader?.read() + // .then((result) => console.log('read result', result, performance.now() - p) || streamResultPromiseResolve(result)) + // .catch((err) => console.log('read err', err, performance.now() - p) || streamResultPromiseReject(err)) + reader?.read() - .then((result) => console.log('read result', result, performance.now() - p) || streamResultPromiseResolve(result)) - .catch((err) => console.log('read err', err, performance.now() - p) || streamResultPromiseReject(err)) + .then((result) => streamResultPromiseResolve(result)) + .catch((err) => streamResultPromiseReject(err)) return ( streamResultPromise @@ -89,7 +93,7 @@ const init = makeCallListener(async ( done: value.done })) .catch(err => { - console.log('streamRead error', err) + // console.log('streamRead error', err) return { buffer: undefined, done: false, @@ -115,7 +119,7 @@ const init = makeCallListener(async ( pts: number, duration: number ) => { if (!writeBuffer.byteLength) return - console.log('flush', writeBuffer) + // console.log('flush', writeBuffer) const newBuffer = new Uint8Array(writeBuffer.byteLength) newBuffer.set(writeBuffer) readResultPromiseResolve({ From e0d6e5edcd8863da3693a90743de9ed599693a12 Mon Sep 17 00:00:00 2001 From: Banou Date: Tue, 23 Jan 2024 05:19:43 +0100 Subject: [PATCH 10/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fimpove=20th?= =?UTF-8?q?e=20code=20a=20little=20bit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 1 + src/main.cpp | 4 +--- src/test.ts | 8 +++++++- src/worker/index.ts | 27 +++++++++++++-------------- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index e13a129..73c540b 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ dist/libav-wasm.js: -s INITIAL_MEMORY=150mb \ -s TOTAL_MEMORY=125mb \ -s STACK_SIZE=50mb \ + -s ALLOW_MEMORY_GROWTH=1 \ -s ASYNCIFY \ -s MODULARIZE=1 \ -g \ diff --git a/src/main.cpp b/src/main.cpp index 1a29265..39dc5f4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -553,6 +553,7 @@ extern "C" { prev_duration ).await(); is_flushing = false; + flushed = true; } // free packet @@ -697,9 +698,6 @@ extern "C" { // copy the result buffer into AVIO's buffer memcpy(buf, (uint8_t*)buffer.c_str(), buffer_size); - // free the result buffer - buffer.clear(); - remuxObject.currentOffset = remuxObject.currentOffset + buffer_size; // If result buffer size is 0, we reached the end of the file return buffer_size; diff --git a/src/test.ts b/src/test.ts index c2e9ce8..a976c09 100644 --- a/src/test.ts +++ b/src/test.ts @@ -277,7 +277,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) let chunks: Chunk[] = [] const PREVIOUS_BUFFER_COUNT = 1 - const BUFFER_COUNT = 2 + const BUFFER_COUNT = 5 await appendBuffer(headerChunk.buffer) @@ -333,6 +333,12 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) updateBuffers() + // while (1) { + // await new Promise(resolve => setTimeout(resolve, 100)) + // console.log('pull') + // await pull() + // } + setTimeout(() => { video.playbackRate = 5 }, 100) diff --git a/src/worker/index.ts b/src/worker/index.ts index 2b1bd11..e73424c 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -76,26 +76,28 @@ const init = makeCallListener(async ( streamResultPromiseReject = reject }) - // const p = performance.now() - // console.log('read') - // reader?.read() - // .then((result) => console.log('read result', result, performance.now() - p) || streamResultPromiseResolve(result)) - // .catch((err) => console.log('read err', err, performance.now() - p) || streamResultPromiseReject(err)) - reader?.read() .then((result) => streamResultPromiseResolve(result)) .catch((err) => streamResultPromiseReject(err)) return ( streamResultPromise + // .then((value) => { + // const newBuffer = new Uint8Array(value.value.byteLength) + // newBuffer.set(value.value) + + // return { + // value: newBuffer, + // done: value.done + // } + // }) .then((value) => ({ value: value.value, done: value.done })) .catch(err => { - // console.log('streamRead error', err) return { - buffer: undefined, + value: undefined, done: false, cancelled: true } @@ -107,8 +109,7 @@ const init = makeCallListener(async ( return buffer }, write: async (buffer: Uint8Array) => { - // console.log('buffer', buffer) - // console.log('writeBuffer', writeBuffer) + console.log('WRITE') const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -119,13 +120,11 @@ const init = makeCallListener(async ( pts: number, duration: number ) => { if (!writeBuffer.byteLength) return - // console.log('flush', writeBuffer) - const newBuffer = new Uint8Array(writeBuffer.byteLength) - newBuffer.set(writeBuffer) + console.log('FLUSH', pts, duration) readResultPromiseResolve({ isHeader: false, offset, - buffer: newBuffer.buffer as Uint8Array, + buffer: writeBuffer.buffer as Uint8Array, pos: position, pts, duration From d836691b93228d0baba3dba92954fe2b85369605 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 02:42:22 +0100 Subject: [PATCH 11/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ftry=20to=20?= =?UTF-8?q?debug=20seeking=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 97 +++++++++++++++++++------------- src/test.ts | 133 +++++++++++++++++++++++++++++++++----------- src/utils.ts | 2 +- src/worker/index.ts | 22 ++++---- 4 files changed, 169 insertions(+), 85 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 39dc5f4..669a79e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -67,6 +67,7 @@ extern "C" { val streamRead = val::undefined(); val currentReadStream = val::undefined(); + val clearStream = val::undefined(); val attachment = val::undefined(); val subtitle = val::undefined(); @@ -100,6 +101,7 @@ extern "C" { buffer_size = options["bufferSize"].as(); randomRead = options["randomRead"]; streamRead = options["streamRead"]; + clearStream = options["clearStream"]; attachment = options["attachment"]; subtitle = options["subtitle"]; write = options["write"]; @@ -530,7 +532,7 @@ extern "C" { prev_duration = duration; prev_pts = pts; - // printf("pts: %f, duration: %f\n", pts, duration); + printf("pts: %f, duration: %f\n", prev_pts, duration); prev_pos = pos; duration = 0; @@ -591,17 +593,18 @@ extern "C" { int _seek(int timestamp, int flags) { destroy(); - init(); - - read(); - - int res; prev_duration = 0; prev_pts = 0; prev_pos = 0; duration = 0; pts = 0; pos = 0; + init(); + read(); + + clearStream().await(); + + int res; if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, flags)) < 0) { printf("ERROR: could not seek frame | %s \n", av_err2str(res)); } @@ -656,40 +659,54 @@ extern "C" { static int readFunction(void* opaque, uint8_t* buf, int buf_size) { Remuxer &remuxObject = *reinterpret_cast(opaque); std::string buffer; - if (remuxObject.initializing) { - emscripten::val &randomRead = remuxObject.randomRead; - if (remuxObject.first_init) { - buffer = - randomRead( - static_cast(remuxObject.input_format_context->pb->pos), - buf_size - ) - .await() - .as(); - remuxObject.init_buffers.push_back(buffer); - } else { - remuxObject.promise.await(); - buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; - remuxObject.init_buffer_count++; - } - } else { - emscripten::val result = - remuxObject - .streamRead( - static_cast(remuxObject.input_format_context->pb->pos), - buf_size - ) - .await(); - bool is_cancelled = result["cancelled"].as(); - if (is_cancelled) { - return AVERROR_EXIT; - } - bool is_done = result["done"].as(); - if (is_done) { - return AVERROR_EOF; - } - buffer = result["value"].as(); - } + emscripten::val &randomRead = remuxObject.randomRead; + buffer = + randomRead( + static_cast(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); + // if (remuxObject.initializing) { + // emscripten::val &randomRead = remuxObject.randomRead; + // if (remuxObject.first_init) { + // printf("first init\n"); + // buffer = + // randomRead( + // static_cast(remuxObject.input_format_context->pb->pos), + // buf_size + // ) + // .await() + // .as(); + // printf("first init 2 %d\n", buffer.size()); + // remuxObject.init_buffers.push_back(buffer); + // } else { + // printf("first init 3\n"); + // remuxObject.promise.await(); + // buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; + // printf("first init 4 %d\n", buffer.size()); + // remuxObject.init_buffer_count++; + // } + // } else { + // printf("first init 5\n"); + // emscripten::val result = + // remuxObject + // .streamRead( + // static_cast(remuxObject.input_format_context->pb->pos), + // buf_size + // ) + // .await(); + // printf("first init 6\n"); + // bool is_cancelled = result["cancelled"].as(); + // if (is_cancelled) { + // return AVERROR_EXIT; + // } + // bool is_done = result["done"].as(); + // if (is_done) { + // return AVERROR_EOF; + // } + // buffer = result["value"].as(); + // } int buffer_size = buffer.size(); if (buffer_size == 0) { diff --git a/src/test.ts b/src/test.ts index a976c09..1e9b1f9 100644 --- a/src/test.ts +++ b/src/test.ts @@ -256,19 +256,15 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) end: sourceBuffer.buffered.end(index) })) - video.addEventListener('timeupdate', () => { - seconds.textContent = video.currentTime.toString() - }) + // video.addEventListener('timeupdate', () => { + // seconds.textContent = video.currentTime.toString() + // }) video.addEventListener('canplaythrough', () => { video.playbackRate = 1 video.play() }, { once: true }) - video.addEventListener('seeking', () => { - - }) - const logAndAppend = async (chunk: Chunk) => { console.log('res', chunk) await appendBuffer(chunk.buffer) @@ -282,22 +278,31 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) const pull = async () => { + console.log('read') const chunk = await remuxer.read() + console.log('read', chunk) chunks = [...chunks, chunk] return chunk } - const updateBuffers = queuedDebounceWithLastCall(500, async () => { + let seeking = false + + const updateBuffers = queuedDebounceWithLastCall(250, async () => { + if (seeking) return const { currentTime } = video const currentChunkIndex = chunks.findIndex(({ pts, duration }) => pts <= currentTime && pts + duration >= currentTime) + const sliceIndex = Math.max(0, currentChunkIndex - PREVIOUS_BUFFER_COUNT) - for (let i = 0; i < currentChunkIndex + BUFFER_COUNT; i++) { + console.log('currentChunkIndex', currentChunkIndex, chunks.length, currentTime) + for (let i = 0; i < sliceIndex + BUFFER_COUNT; i++) { + console.log('pull check', i, chunks[i]) if (chunks[i]) continue + console.log('pulling') const chunk = await pull() + console.log('pull', chunk) await appendBuffer(chunk.buffer) } - const sliceIndex = Math.max(0, currentChunkIndex - PREVIOUS_BUFFER_COUNT) if (sliceIndex) chunks = chunks.slice(sliceIndex) const bufferedRanges = getTimeRanges() @@ -320,9 +325,61 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } } + }) + + console.log('SOURCE BUFFER', sourceBuffer) + console.log('APPEND BUFFER', appendBuffer) + + const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { + seeking = true + + await video.pause() + await new Promise(resolve => setTimeout(resolve, 100)) + + const bufferedRanges = getTimeRanges() + for (const { start, end } of bufferedRanges) { + await unbufferRange(start, end) + } - // console.log('chunks', ...chunks) - // console.log('ranges2', ...bufferedRanges) + await appendBuffer(headerChunk.buffer) + + chunks = [] + console.log('seek', seekTime) + await remuxer.seek(seekTime - 20) + + await new Promise(resolve => setTimeout(resolve, 100)) + console.log('seek before append', getTimeRanges()) + + + const chunk1 = await pull() + await appendBuffer(chunk1.buffer) + console.log('appended buffer', chunk1) + await new Promise(resolve => setTimeout(resolve, 100)) + const chunk2 = await pull() + await appendBuffer(chunk2.buffer) + console.log('appended buffer', chunk2) + await new Promise(resolve => setTimeout(resolve, 100)) + const chunk3 = await pull() + await appendBuffer(chunk3.buffer) + console.log('appended buffer', chunk3) + await new Promise(resolve => setTimeout(resolve, 100)) + const chunk4 = await pull() + await appendBuffer(chunk4.buffer) + console.log('appended buffer', chunk4) + console.log('seeked') + // await updateBuffers() + console.log('seeked2', chunks, getTimeRanges()) + await new Promise(resolve => setTimeout(resolve, 100)) + video.currentTime = seekTime + await new Promise(resolve => setTimeout(resolve, 0)) + seeking = false + console.log('finished seeking') + // await video.play() + + + // for (const chunk of chunks) { + // await appendBuffer(chunk.buffer) + // } }) appendBuffer((await pull()).buffer) @@ -331,30 +388,38 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) updateBuffers() }) - updateBuffers() + video.addEventListener('waiting', () => { + updateBuffers() + }) - // while (1) { - // await new Promise(resolve => setTimeout(resolve, 100)) - // console.log('pull') - // await pull() - // } + video.addEventListener('seeking', (ev) => { + if (seeking) return + seek(video.currentTime) + }) + + video.addEventListener('progress', () => { + console.log('time ranges', getTimeRanges(), chunks) + }) - setTimeout(() => { - video.playbackRate = 5 + updateBuffers() + + setInterval(() => { + seconds.textContent = video.currentTime.toString() }, 100) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - // await logAndAppend((await remuxer.read())) - console.log('ranges', getTimeRanges()) - // console.log((await remuxer.read()).pts) + // setTimeout(() => { + // setInterval(async () => { + // console.log('time ranges', getTimeRanges(), chunks) + // // for (const chunk of chunks) { + // // await appendBuffer(chunk.buffer) + // // } + // }, 1000) + // }, 3000) + + setTimeout(async () => { + await video.pause() + await new Promise(resolve => setTimeout(resolve, 1000)) + // video.playbackRate = 5 + video.currentTime = 400 + }, 2000) }) diff --git a/src/utils.ts b/src/utils.ts index cfa7d5c..cd6c330 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -207,7 +207,7 @@ export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => } const tryToBuffer = async (): Promise => { - if (this.buffers.length >= SIZE) return + if (this.buffers.length >= SIZE || this.closed) return if (this.buffers.length === 0) { await pull() diff --git a/src/worker/index.ts b/src/worker/index.ts index e73424c..e468b07 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -67,6 +67,7 @@ const init = makeCallListener(async ( }, streamRead: async (offset: number) => { if (!currentStream) { + console.log('create stream', offset) currentStream = await getStream(offset) reader = currentStream.getReader() } @@ -77,20 +78,16 @@ const init = makeCallListener(async ( }) reader?.read() - .then((result) => streamResultPromiseResolve(result)) + .then(result => ({ + value: result.value, + done: result.value === undefined + // done: result.done + })) + .then((result) => console.log('read stream', result) || streamResultPromiseResolve(result)) .catch((err) => streamResultPromiseReject(err)) return ( streamResultPromise - // .then((value) => { - // const newBuffer = new Uint8Array(value.value.byteLength) - // newBuffer.set(value.value) - - // return { - // value: newBuffer, - // done: value.done - // } - // }) .then((value) => ({ value: value.value, done: value.done @@ -104,8 +101,13 @@ const init = makeCallListener(async ( }) ) }, + clearStream: async () => { + currentStream = undefined + reader = undefined + }, randomRead: async (offset: number, bufferSize: number) => { const buffer = await randomRead(offset, bufferSize) + console.log('random read', offset, bufferSize, buffer) return buffer }, write: async (buffer: Uint8Array) => { From 4e5819aef670ff93e1720cce9fc6eaef9071a7cf Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 04:39:28 +0100 Subject: [PATCH 12/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20seek?= =?UTF-8?q?=20bug=20with=20timestampOffset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 103 +++++++++++++++++++++----------------------- src/test.ts | 56 +++++++----------------- src/worker/index.ts | 38 ++++++++++------ 3 files changed, 90 insertions(+), 107 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 669a79e..b7826d6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -226,6 +226,13 @@ extern "C" { duration = 0; first_frame = true; + prev_duration = 0; + prev_pts = 0; + prev_pos = 0; + duration = 0; + pts = 0; + pos = 0; + input_avio_context = nullptr; input_format_context = avformat_alloc_context(); // output_format_context = avformat_alloc_context(); @@ -532,7 +539,7 @@ extern "C" { prev_duration = duration; prev_pts = pts; - printf("pts: %f, duration: %f\n", prev_pts, duration); + // printf("pts: %f, duration: %f\n", prev_pts, duration); prev_pos = pos; duration = 0; @@ -593,18 +600,17 @@ extern "C" { int _seek(int timestamp, int flags) { destroy(); + init(); + + clearStream().await(); + + int res; prev_duration = 0; prev_pts = 0; prev_pos = 0; duration = 0; pts = 0; pos = 0; - init(); - read(); - - clearStream().await(); - - int res; if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, flags)) < 0) { printf("ERROR: could not seek frame | %s \n", av_err2str(res)); } @@ -659,54 +665,41 @@ extern "C" { static int readFunction(void* opaque, uint8_t* buf, int buf_size) { Remuxer &remuxObject = *reinterpret_cast(opaque); std::string buffer; - emscripten::val &randomRead = remuxObject.randomRead; - buffer = - randomRead( - static_cast(remuxObject.input_format_context->pb->pos), - buf_size - ) - .await() - .as(); - // if (remuxObject.initializing) { - // emscripten::val &randomRead = remuxObject.randomRead; - // if (remuxObject.first_init) { - // printf("first init\n"); - // buffer = - // randomRead( - // static_cast(remuxObject.input_format_context->pb->pos), - // buf_size - // ) - // .await() - // .as(); - // printf("first init 2 %d\n", buffer.size()); - // remuxObject.init_buffers.push_back(buffer); - // } else { - // printf("first init 3\n"); - // remuxObject.promise.await(); - // buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; - // printf("first init 4 %d\n", buffer.size()); - // remuxObject.init_buffer_count++; - // } - // } else { - // printf("first init 5\n"); - // emscripten::val result = - // remuxObject - // .streamRead( - // static_cast(remuxObject.input_format_context->pb->pos), - // buf_size - // ) - // .await(); - // printf("first init 6\n"); - // bool is_cancelled = result["cancelled"].as(); - // if (is_cancelled) { - // return AVERROR_EXIT; - // } - // bool is_done = result["done"].as(); - // if (is_done) { - // return AVERROR_EOF; - // } - // buffer = result["value"].as(); - // } + + if (remuxObject.initializing) { + emscripten::val &randomRead = remuxObject.randomRead; + if (remuxObject.first_init) { + buffer = + randomRead( + static_cast(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); + remuxObject.init_buffers.push_back(buffer); + } else { + remuxObject.promise.await(); + buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; + remuxObject.init_buffer_count++; + } + } else { + emscripten::val result = + remuxObject + .streamRead( + static_cast(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await(); + bool is_cancelled = result["cancelled"].as(); + if (is_cancelled) { + return AVERROR_EXIT; + } + bool is_done = result["done"].as(); + if (is_done) { + return AVERROR_EOF; + } + buffer = result["value"].as(); + } int buffer_size = buffer.size(); if (buffer_size == 0) { diff --git a/src/test.ts b/src/test.ts index 1e9b1f9..e21dd8a 100644 --- a/src/test.ts +++ b/src/test.ts @@ -278,9 +278,9 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) const pull = async () => { - console.log('read') + // console.log('read') const chunk = await remuxer.read() - console.log('read', chunk) + // console.log('read', chunk) chunks = [...chunks, chunk] return chunk } @@ -293,13 +293,13 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) const currentChunkIndex = chunks.findIndex(({ pts, duration }) => pts <= currentTime && pts + duration >= currentTime) const sliceIndex = Math.max(0, currentChunkIndex - PREVIOUS_BUFFER_COUNT) - console.log('currentChunkIndex', currentChunkIndex, chunks.length, currentTime) + // console.log('currentChunkIndex', currentChunkIndex, chunks.length, currentTime) for (let i = 0; i < sliceIndex + BUFFER_COUNT; i++) { - console.log('pull check', i, chunks[i]) + // console.log('pull check', i, chunks[i]) if (chunks[i]) continue - console.log('pulling') + // console.log('pulling') const chunk = await pull() - console.log('pull', chunk) + // console.log('pull', chunk) await appendBuffer(chunk.buffer) } @@ -327,9 +327,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } }) - console.log('SOURCE BUFFER', sourceBuffer) - console.log('APPEND BUFFER', appendBuffer) - const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { seeking = true @@ -344,42 +341,25 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) chunks = [] - console.log('seek', seekTime) - await remuxer.seek(seekTime - 20) + await remuxer.seek(seekTime - 10) await new Promise(resolve => setTimeout(resolve, 100)) - console.log('seek before append', getTimeRanges()) + video.addEventListener('canplaythrough', () => { + video.playbackRate = 1 + video.play() + }, { once: true }) const chunk1 = await pull() + sourceBuffer.timestampOffset = chunk1.pts await appendBuffer(chunk1.buffer) - console.log('appended buffer', chunk1) - await new Promise(resolve => setTimeout(resolve, 100)) - const chunk2 = await pull() - await appendBuffer(chunk2.buffer) - console.log('appended buffer', chunk2) await new Promise(resolve => setTimeout(resolve, 100)) - const chunk3 = await pull() - await appendBuffer(chunk3.buffer) - console.log('appended buffer', chunk3) - await new Promise(resolve => setTimeout(resolve, 100)) - const chunk4 = await pull() - await appendBuffer(chunk4.buffer) - console.log('appended buffer', chunk4) - console.log('seeked') - // await updateBuffers() - console.log('seeked2', chunks, getTimeRanges()) + await updateBuffers() await new Promise(resolve => setTimeout(resolve, 100)) video.currentTime = seekTime await new Promise(resolve => setTimeout(resolve, 0)) seeking = false - console.log('finished seeking') - // await video.play() - - - // for (const chunk of chunks) { - // await appendBuffer(chunk.buffer) - // } + await updateBuffers() }) appendBuffer((await pull()).buffer) @@ -397,10 +377,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) seek(video.currentTime) }) - video.addEventListener('progress', () => { - console.log('time ranges', getTimeRanges(), chunks) - }) - updateBuffers() setInterval(() => { @@ -417,9 +393,9 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // }, 3000) setTimeout(async () => { - await video.pause() + // await video.pause() await new Promise(resolve => setTimeout(resolve, 1000)) // video.playbackRate = 5 - video.currentTime = 400 + video.currentTime = 587.618314 }, 2000) }) diff --git a/src/worker/index.ts b/src/worker/index.ts index e468b07..db4f838 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -67,7 +67,6 @@ const init = makeCallListener(async ( }, streamRead: async (offset: number) => { if (!currentStream) { - console.log('create stream', offset) currentStream = await getStream(offset) reader = currentStream.getReader() } @@ -77,14 +76,30 @@ const init = makeCallListener(async ( streamResultPromiseReject = reject }) - reader?.read() - .then(result => ({ - value: result.value, - done: result.value === undefined - // done: result.done - })) - .then((result) => console.log('read stream', result) || streamResultPromiseResolve(result)) - .catch((err) => streamResultPromiseReject(err)) + const tryReading = (): Promise | undefined => + reader + ?.read() + .then(result => ({ + value: result.value, + done: result.value === undefined + // done: result.done + })) + .then(async (result) => { + console.log('tryReading', result.done, result.value?.byteLength) + if (result.done) { + if (offset >= length) { + return streamResultPromiseResolve(result) + } + currentStream = await getStream(offset) + reader = currentStream.getReader() + return tryReading() + } + + return streamResultPromiseResolve(result) + }) + .catch((err) => streamResultPromiseReject(err)) + + tryReading() return ( streamResultPromise @@ -107,11 +122,10 @@ const init = makeCallListener(async ( }, randomRead: async (offset: number, bufferSize: number) => { const buffer = await randomRead(offset, bufferSize) - console.log('random read', offset, bufferSize, buffer) return buffer }, write: async (buffer: Uint8Array) => { - console.log('WRITE') + // console.log('WRITE') const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -122,7 +136,7 @@ const init = makeCallListener(async ( pts: number, duration: number ) => { if (!writeBuffer.byteLength) return - console.log('FLUSH', pts, duration) + // console.log('FLUSH', pts, duration) readResultPromiseResolve({ isHeader: false, offset, From 61c18e06aa90db2b8a21f766f44f16d68566e57e Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 05:03:23 +0100 Subject: [PATCH 13/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fmemory=20le?= =?UTF-8?q?ak=20bug=20on=20streams?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test.ts | 36 ++++++++++++++++-------------------- src/worker/index.ts | 5 +---- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/test.ts b/src/test.ts index e21dd8a..427efc4 100644 --- a/src/test.ts +++ b/src/test.ts @@ -105,11 +105,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => - toBufferedStream(3)( + // toBufferedStream(3)( toStreamChunkSize(BUFFER_SIZE)( res.body! ) - ) + // ) ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) @@ -333,11 +333,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await video.pause() await new Promise(resolve => setTimeout(resolve, 100)) - const bufferedRanges = getTimeRanges() - for (const { start, end } of bufferedRanges) { - await unbufferRange(start, end) - } - await appendBuffer(headerChunk.buffer) chunks = [] @@ -353,9 +348,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) const chunk1 = await pull() sourceBuffer.timestampOffset = chunk1.pts await appendBuffer(chunk1.buffer) - await new Promise(resolve => setTimeout(resolve, 100)) - await updateBuffers() - await new Promise(resolve => setTimeout(resolve, 100)) video.currentTime = seekTime await new Promise(resolve => setTimeout(resolve, 0)) seeking = false @@ -383,19 +375,23 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) seconds.textContent = video.currentTime.toString() }, 100) - // setTimeout(() => { - // setInterval(async () => { - // console.log('time ranges', getTimeRanges(), chunks) - // // for (const chunk of chunks) { - // // await appendBuffer(chunk.buffer) - // // } - // }, 1000) - // }, 3000) + // setInterval(async () => { + // console.log('time ranges', getTimeRanges(), chunks) + // }, 1000) setTimeout(async () => { // await video.pause() + video.currentTime = 587.618314 await new Promise(resolve => setTimeout(resolve, 1000)) // video.playbackRate = 5 - video.currentTime = 587.618314 - }, 2000) + video.currentTime = 400 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 300 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 500 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 600 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 300 + }, 1000) }) diff --git a/src/worker/index.ts b/src/worker/index.ts index db4f838..a127202 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -82,11 +82,10 @@ const init = makeCallListener(async ( .then(result => ({ value: result.value, done: result.value === undefined - // done: result.done })) .then(async (result) => { - console.log('tryReading', result.done, result.value?.byteLength) if (result.done) { + reader?.cancel() if (offset >= length) { return streamResultPromiseResolve(result) } @@ -125,7 +124,6 @@ const init = makeCallListener(async ( return buffer }, write: async (buffer: Uint8Array) => { - // console.log('WRITE') const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -136,7 +134,6 @@ const init = makeCallListener(async ( pts: number, duration: number ) => { if (!writeBuffer.byteLength) return - // console.log('FLUSH', pts, duration) readResultPromiseResolve({ isHeader: false, offset, From dbed4c03780addf9bd3516a7b783d4cbf897c2b4 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 05:27:28 +0100 Subject: [PATCH 14/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20leak,?= =?UTF-8?q?=20remove=20transferable=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 65 ++++++++++++++++++++++++++++++++++++++++++- src/test.ts | 16 +++++++---- src/utils.ts | 22 +++++++++++++-- src/worker/index.ts | 68 ++++++--------------------------------------- 4 files changed, 102 insertions(+), 69 deletions(-) diff --git a/src/index.ts b/src/index.ts index 5f35c49..d82d21a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -100,6 +100,13 @@ export const makeTransmuxer = async ({ const subtitles = new Map() + let currentStream: ReadableStream | undefined + let reader: ReadableStreamDefaultReader | undefined + + let streamResultPromiseResolve: (value: ReadableStreamReadResult) => void + let streamResultPromiseReject: (reason?: any) => void + let streamResultPromise: Promise> + const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } = await target( 'init', @@ -149,7 +156,63 @@ export const makeTransmuxer = async ({ }, attachment: async (filename, mimetype, buffer) => attachment(filename, mimetype, buffer), randomRead: (offset, bufferSize) => _randomRead(offset, bufferSize), - getStream: (offset) => _getStream(offset), + streamRead: async (offset: number) => { + if (!currentStream) { + currentStream = await _getStream(offset) + reader = currentStream.getReader() + } + + streamResultPromise = new Promise>((resolve, reject) => { + streamResultPromiseResolve = resolve + streamResultPromiseReject = reject + }) + + const tryReading = (): Promise | undefined => + reader + ?.read() + .then(result => ({ + value: result.value?.buffer, + done: result.value === undefined, + cancelled: false + })) + .then(async (result) => { + if (result.done) { + reader?.cancel() + if (offset >= length) { + return streamResultPromiseResolve(result) + } + currentStream = await _getStream(offset) + reader = currentStream.getReader() + return tryReading() + } + + return streamResultPromiseResolve(result) + }) + .catch((err) => streamResultPromiseReject(err)) + + tryReading() + + return ( + streamResultPromise + .then((value) => ({ + value: value.value, + done: value.done, + cancelled: false + })) + .catch(err => { + console.error(err) + return { + value: undefined, + done: false, + cancelled: true + } + }) + ) + }, + clearStream: async () => { + currentStream = undefined + reader = undefined + }, write: ({ isHeader, offset, diff --git a/src/test.ts b/src/test.ts index 427efc4..0944aa1 100644 --- a/src/test.ts +++ b/src/test.ts @@ -105,11 +105,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) } } ).then(res => - // toBufferedStream(3)( + toBufferedStream(3)( toStreamChunkSize(BUFFER_SIZE)( res.body! ) - // ) + ) ), subtitle: (title, language, subtitle) => { // console.log('SUBTITLE HEADER', title, language, subtitle) @@ -382,16 +382,20 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) setTimeout(async () => { // await video.pause() video.currentTime = 587.618314 - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 2000)) // video.playbackRate = 5 video.currentTime = 400 - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 2000)) video.currentTime = 300 - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 2000)) video.currentTime = 500 - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 2000)) video.currentTime = 600 await new Promise(resolve => setTimeout(resolve, 1000)) video.currentTime = 300 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 200 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 100 }, 1000) }) diff --git a/src/utils.ts b/src/utils.ts index cd6c330..6ef0b9e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -138,13 +138,22 @@ export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => new ReadableStream({ reader: undefined, leftOverData: undefined, + closed: false, start() { this.reader = stream.getReader() + this.reader.closed.then(() => { + this.closed = true + }) }, async pull(controller) { const { leftOverData }: { leftOverData: Uint8Array | undefined } = this const accumulate = async ({ buffer = new Uint8Array(SIZE), currentSize = 0 } = {}): Promise<{ buffer?: Uint8Array, currentSize?: number, done: boolean }> => { + if (this.closed) { + this.reader = undefined + this.leftOverData = undefined + return { buffer: new Uint8Array(), currentSize: 0, done: true } + } const { value: newBuffer, done } = await this.reader!.read() if (currentSize === 0 && leftOverData) { @@ -154,7 +163,10 @@ export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => } if (done) { - return { buffer: buffer.slice(0, currentSize), currentSize, done } + const finalResult = { buffer: buffer.slice(0, currentSize), currentSize, done } + this.reader = undefined + this.leftOverData = undefined + return finalResult } let newSize @@ -175,8 +187,14 @@ export const toStreamChunkSize = (SIZE: number) => (stream: ReadableStream) => }, cancel() { this.reader!.cancel() + this.reader = undefined + this.leftOverData = undefined } - } as UnderlyingDefaultSource & { reader: ReadableStreamDefaultReader | undefined, leftOverData: Uint8Array | undefined }) + } as UnderlyingDefaultSource & { + reader: ReadableStreamDefaultReader | undefined + leftOverData: Uint8Array | undefined + closed: boolean + }) export const toBufferedStream = (SIZE: number) => (stream: ReadableStream) => new ReadableStream({ diff --git a/src/worker/index.ts b/src/worker/index.ts index a127202..15b7d7d 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -21,13 +21,14 @@ let module: ReturnType // @ts-ignore const init = makeCallListener(async ( - { publicPath, length, bufferSize, randomRead, getStream, write, attachment, subtitle }: + { publicPath, length, bufferSize, randomRead, streamRead, clearStream, write, attachment, subtitle }: { publicPath: string length: number bufferSize: number randomRead: (offset: number, size: number) => Promise - getStream: (offset: number) => Promise> + streamRead: (offset: number) => Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }> + clearStream: () => Promise write: (params: { offset: number, arrayBuffer: ArrayBuffer, isHeader: boolean, position: number, pts: number, duration: number @@ -39,13 +40,6 @@ const init = makeCallListener(async ( let writeBuffer = new Uint8Array(0) - let currentStream: ReadableStream | undefined - let reader: ReadableStreamDefaultReader | undefined - - let streamResultPromiseResolve: (value: ReadableStreamReadResult) => void - let streamResultPromiseReject: (reason?: any) => void - let streamResultPromise: Promise> - let readResultPromiseResolve: (value: Chunk) => void let readResultPromiseReject: (reason?: any) => void let readResultPromise: Promise @@ -66,59 +60,13 @@ const init = makeCallListener(async ( attachment(filename, mimetype, arraybuffer) }, streamRead: async (offset: number) => { - if (!currentStream) { - currentStream = await getStream(offset) - reader = currentStream.getReader() + const res = await streamRead(offset) + return { + ...res, + value: new Uint8Array(res.value) } - - streamResultPromise = new Promise>((resolve, reject) => { - streamResultPromiseResolve = resolve - streamResultPromiseReject = reject - }) - - const tryReading = (): Promise | undefined => - reader - ?.read() - .then(result => ({ - value: result.value, - done: result.value === undefined - })) - .then(async (result) => { - if (result.done) { - reader?.cancel() - if (offset >= length) { - return streamResultPromiseResolve(result) - } - currentStream = await getStream(offset) - reader = currentStream.getReader() - return tryReading() - } - - return streamResultPromiseResolve(result) - }) - .catch((err) => streamResultPromiseReject(err)) - - tryReading() - - return ( - streamResultPromise - .then((value) => ({ - value: value.value, - done: value.done - })) - .catch(err => { - return { - value: undefined, - done: false, - cancelled: true - } - }) - ) - }, - clearStream: async () => { - currentStream = undefined - reader = undefined }, + clearStream: () => clearStream(), randomRead: async (offset: number, bufferSize: number) => { const buffer = await randomRead(offset, bufferSize) return buffer From efad8f74797832df923cbdebe75dcfa1e353ffb4 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 05:51:06 +0100 Subject: [PATCH 15/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20long?= =?UTF-8?q?=20standing=20stream=20requests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 1 + src/test.ts | 12 +++++++----- vite.config.ts | 18 ++++++++++++++++-- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/index.ts b/src/index.ts index d82d21a..d015538 100644 --- a/src/index.ts +++ b/src/index.ts @@ -210,6 +210,7 @@ export const makeTransmuxer = async ({ ) }, clearStream: async () => { + reader?.cancel() currentStream = undefined reader = undefined }, diff --git a/src/test.ts b/src/test.ts index 0944aa1..5b7b647 100644 --- a/src/test.ts +++ b/src/test.ts @@ -328,6 +328,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) }) const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { + const p = performance.now() seeking = true await video.pause() @@ -352,6 +353,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await new Promise(resolve => setTimeout(resolve, 0)) seeking = false await updateBuffers() + console.log('seek time', performance.now() - p) }) appendBuffer((await pull()).buffer) @@ -391,11 +393,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) video.currentTime = 500 await new Promise(resolve => setTimeout(resolve, 2000)) video.currentTime = 600 - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 2000)) video.currentTime = 300 - await new Promise(resolve => setTimeout(resolve, 1000)) - video.currentTime = 200 - await new Promise(resolve => setTimeout(resolve, 1000)) - video.currentTime = 100 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 200 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 100 }, 1000) }) diff --git a/vite.config.ts b/vite.config.ts index 0cbfe51..9b8a131 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -20,6 +20,20 @@ export default defineConfig((env) => ({ env.mode === 'development' ? [] : [commonjs()] - ) - ] + ), + { + name: 'configure-response-headers', + configureServer: (server) => { + server.middlewares.use((_req, res, next) => { + res.setHeader('Cache-Control', 'no-store') + next() + }) + } + } + ], + server: { + fs: { + allow: ['../..'] + } + } })) From a33c37ba02b2b24d3760763e625f673c9ab53592 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 16:59:51 +0100 Subject: [PATCH 16/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fbackwards?= =?UTF-8?q?=20seek=20&=20improve=20latency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 9 +------- src/main.cpp | 4 ++-- src/test.ts | 51 ++++++++++++++++----------------------------- src/worker/index.ts | 2 +- 4 files changed, 22 insertions(+), 44 deletions(-) diff --git a/src/index.ts b/src/index.ts index d015538..cfdd56f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,13 +2,6 @@ import type { Resolvers as WorkerResolvers } from './worker' import { call } from 'osra' -import { SEEK_FLAG, SEEK_WHENCE_FLAG } from './utils' - -export { - SEEK_FLAG, - SEEK_WHENCE_FLAG -} - export type MakeTransmuxerOptions = { /** Path that will be used to locate the .wasm file imported from the worker */ publicPath: string @@ -242,7 +235,7 @@ export const makeTransmuxer = async ({ return workerDestroy() }, read: () => workerRead(), - seek: (time: number) => workerSeek(Math.max(0, time) * 1000, SEEK_FLAG.NONE), + seek: (time: number) => workerSeek(Math.max(0, time) * 1000), getInfo: () => getInfo() as Promise<{ input: MediaInfo, output: MediaInfo }> } diff --git a/src/main.cpp b/src/main.cpp index b7826d6..045c86d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -598,7 +598,7 @@ extern "C" { return output_avio_context->pos; } - int _seek(int timestamp, int flags) { + int _seek(int timestamp) { destroy(); init(); @@ -611,7 +611,7 @@ extern "C" { duration = 0; pts = 0; pos = 0; - if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, flags)) < 0) { + if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, AVSEEK_FLAG_BACKWARD)) < 0) { printf("ERROR: could not seek frame | %s \n", av_err2str(res)); } return 0; diff --git a/src/test.ts b/src/test.ts index 5b7b647..2466921 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,7 +1,7 @@ // @ts-ignore import PQueue from 'p-queue' -import { SEEK_WHENCE_FLAG, queuedDebounceWithLastCall, toBufferedStream, toStreamChunkSize } from './utils' +import { queuedDebounceWithLastCall, toBufferedStream, toStreamChunkSize } from './utils' import { makeTransmuxer } from '.' type Chunk = { @@ -13,11 +13,8 @@ type Chunk = { } const BUFFER_SIZE = 2_500_000 -const VIDEO_URL = '../video5.mkv' +const VIDEO_URL = '../video2.mkv' // const VIDEO_URL = '../spidey.mkv' -const PRE_SEEK_NEEDED_BUFFERS_IN_SECONDS = 10 -const POST_SEEK_NEEDED_BUFFERS_IN_SECONDS = 15 -const POST_SEEK_REMOVE_BUFFERS_IN_SECONDS = 60 export default async function saveFile(plaintext: ArrayBuffer, fileName: string, fileType: string) { return new Promise((resolve, reject) => { @@ -273,7 +270,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) let chunks: Chunk[] = [] const PREVIOUS_BUFFER_COUNT = 1 - const BUFFER_COUNT = 5 + const BUFFER_COUNT = 3 await appendBuffer(headerChunk.buffer) @@ -330,33 +327,21 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { const p = performance.now() seeking = true - - await video.pause() - await new Promise(resolve => setTimeout(resolve, 100)) - await appendBuffer(headerChunk.buffer) - chunks = [] - await remuxer.seek(seekTime - 10) - - await new Promise(resolve => setTimeout(resolve, 100)) - - video.addEventListener('canplaythrough', () => { - video.playbackRate = 1 - video.play() - }, { once: true }) - + await remuxer.seek(seekTime) const chunk1 = await pull() sourceBuffer.timestampOffset = chunk1.pts await appendBuffer(chunk1.buffer) - video.currentTime = seekTime - await new Promise(resolve => setTimeout(resolve, 0)) seeking = false await updateBuffers() console.log('seek time', performance.now() - p) }) - appendBuffer((await pull()).buffer) + console.log('pulling first chunk') + const firstChunk = await pull() + console.log('first chunk', firstChunk) + appendBuffer(firstChunk.buffer) video.addEventListener('timeupdate', () => { updateBuffers() @@ -384,19 +369,19 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) setTimeout(async () => { // await video.pause() video.currentTime = 587.618314 - await new Promise(resolve => setTimeout(resolve, 2000)) + await new Promise(resolve => setTimeout(resolve, 1000)) // video.playbackRate = 5 video.currentTime = 400 - await new Promise(resolve => setTimeout(resolve, 2000)) - video.currentTime = 300 - await new Promise(resolve => setTimeout(resolve, 2000)) - video.currentTime = 500 - await new Promise(resolve => setTimeout(resolve, 2000)) - video.currentTime = 600 - await new Promise(resolve => setTimeout(resolve, 2000)) - video.currentTime = 300 // await new Promise(resolve => setTimeout(resolve, 1000)) - // video.currentTime = 200 + // video.currentTime = 300 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 500 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 600 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 300 + await new Promise(resolve => setTimeout(resolve, 1000)) + video.currentTime = 534.953306 // await new Promise(resolve => setTimeout(resolve, 1000)) // video.currentTime = 100 }, 1000) diff --git a/src/worker/index.ts b/src/worker/index.ts index 15b7d7d..eb49214 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -115,7 +115,7 @@ const init = makeCallListener(async ( module = undefined writeBuffer = new Uint8Array(0) }, - seek: (timestamp: number, flags: SEEK_FLAG) => remuxer.seek(timestamp, flags), + seek: (timestamp: number) => remuxer.seek(timestamp), read: () => { readResultPromise = new Promise((resolve, reject) => { readResultPromiseResolve = resolve From 2b85084b6b4cfaa1f239ff68b41b6e6ae150ec03 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 17:42:53 +0100 Subject: [PATCH 17/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20init?= =?UTF-8?q?=20on=20specific=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 22 +++++++++++++++------- src/test.ts | 4 ++-- src/worker/index.ts | 10 +++++++++- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 045c86d..8f50398 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -517,10 +517,12 @@ extern "C" { av_packet_rescale_ts(packet, in_stream->time_base, out_stream->time_base); if (in_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - // printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); + printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); duration += packet->duration * av_q2d(out_stream->time_base); } + bool empty_flush = false; + // Set needed pts/pos/duration needed to calculate the real timestamps if (is_keyframe) { bool was_header = is_header; @@ -528,18 +530,18 @@ extern "C" { is_header = false; } else { is_flushing = true; - flush( + empty_flush = flush( static_cast(input_format_context->pb->pos), prev_pos, prev_pts, prev_duration - ).await(); + ).await().as(); flushed = true; } prev_duration = duration; prev_pts = pts; - // printf("pts: %f, duration: %f\n", prev_pts, duration); + printf("pts: %f, duration: %f\n", prev_pts, duration); prev_pos = pos; duration = 0; @@ -554,15 +556,20 @@ extern "C" { continue; } - if (is_flushing) { - flush( + printf("flush: %d %d %d\n", is_flushing, flushed, empty_flush); + if (is_flushing && empty_flush) { + empty_flush = flush( static_cast(input_format_context->pb->pos), prev_pos, prev_pts, prev_duration - ).await(); + ).await().as(); is_flushing = false; flushed = true; + + if (empty_flush) { + flushed = false; + } } // free packet @@ -715,6 +722,7 @@ extern "C" { // Write callback called by AVIOContext static int writeFunction(void* opaque, uint8_t* buf, int buf_size) { + printf("writeFunction\n"); Remuxer &remuxObject = *reinterpret_cast(opaque); if (remuxObject.initializing && !remuxObject.first_init) { diff --git a/src/test.ts b/src/test.ts index 2466921..2599313 100644 --- a/src/test.ts +++ b/src/test.ts @@ -275,9 +275,9 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) const pull = async () => { - // console.log('read') + console.log('read') const chunk = await remuxer.read() - // console.log('read', chunk) + console.log('read', chunk) chunks = [...chunks, chunk] return chunk } diff --git a/src/worker/index.ts b/src/worker/index.ts index eb49214..d0d9af0 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -60,7 +60,9 @@ const init = makeCallListener(async ( attachment(filename, mimetype, arraybuffer) }, streamRead: async (offset: number) => { + console.log('worker streamRead', offset) const res = await streamRead(offset) + console.log('worker streamRead end', offset, res) return { ...res, value: new Uint8Array(res.value) @@ -68,10 +70,12 @@ const init = makeCallListener(async ( }, clearStream: () => clearStream(), randomRead: async (offset: number, bufferSize: number) => { + console.log('worker randomRead', offset, bufferSize) const buffer = await randomRead(offset, bufferSize) return buffer }, write: async (buffer: Uint8Array) => { + console.log('worker write', buffer.byteLength) const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -81,7 +85,8 @@ const init = makeCallListener(async ( offset: number, position: number, pts: number, duration: number ) => { - if (!writeBuffer.byteLength) return + console.log('worker flush', writeBuffer.byteLength) + if (!writeBuffer.byteLength) return true readResultPromiseResolve({ isHeader: false, offset, @@ -91,6 +96,7 @@ const init = makeCallListener(async ( duration }) writeBuffer = new Uint8Array(0) + return false } }) @@ -117,11 +123,13 @@ const init = makeCallListener(async ( }, seek: (timestamp: number) => remuxer.seek(timestamp), read: () => { + console.log('worker read') readResultPromise = new Promise((resolve, reject) => { readResultPromiseResolve = resolve readResultPromiseReject = reject }) remuxer.read() + console.log('worker read end') return readResultPromise }, getInfo: () => remuxer.getInfo() From 24ee9619023ddad64cea8910fd1d6f261d439f00 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 17:57:51 +0100 Subject: [PATCH 18/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fcleanup=20&?= =?UTF-8?q?=20remove=20packet=20timestamp=20warn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/worker/index.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/worker/index.ts b/src/worker/index.ts index d0d9af0..104887f 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -4,14 +4,13 @@ import type { Chunk } from '..' // @ts-ignore import WASMModule from 'libav' -import { SEEK_FLAG, SEEK_WHENCE_FLAG } from '../utils' - const makeModule = (publicPath: string) => WASMModule({ - module: { - locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}` - }, - locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}` + locateFile: (path: string) => `${publicPath}${path.replace('/dist', '')}`, + printErr: (text: string) => + text.includes('Timestamps are unset in a packet') + ? undefined + : console.error(text), }) let module: ReturnType From d3ec0245133a41d79f0bc0648ef9975cada1510c Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 18:22:45 +0100 Subject: [PATCH 19/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20>4gb?= =?UTF-8?q?=20files=20seek?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refactor: ♻️fix >4gb files seek --- src/main.cpp | 29 +++++++++++++++++++---------- src/worker/index.ts | 11 +++++++---- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 8f50398..9786223 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,6 +1,7 @@ #include #include #include +#include using namespace emscripten; using namespace std; @@ -11,6 +12,14 @@ extern "C" { #include }; +template +inline std::string to_string (const T& t) +{ + std::stringstream ss; + ss << t; + return ss.str(); +} + int main() { return 0; } @@ -434,8 +443,8 @@ extern "C" { } if (first_init) { flush( - static_cast(input_format_context->pb->pos), - 0, + to_string(input_format_context->pb->pos), + to_string(0), 0, 0 ); @@ -460,8 +469,8 @@ extern "C" { is_flushing = true; av_write_trailer(output_format_context); flush( - static_cast(input_format_context->pb->pos), - pos, + to_string(input_format_context->pb->pos), + to_string(pos), pts, duration ); @@ -531,8 +540,8 @@ extern "C" { } else { is_flushing = true; empty_flush = flush( - static_cast(input_format_context->pb->pos), - prev_pos, + to_string(input_format_context->pb->pos), + to_string(prev_pos), prev_pts, prev_duration ).await().as(); @@ -559,8 +568,8 @@ extern "C" { printf("flush: %d %d %d\n", is_flushing, flushed, empty_flush); if (is_flushing && empty_flush) { empty_flush = flush( - static_cast(input_format_context->pb->pos), - prev_pos, + to_string(input_format_context->pb->pos), + to_string(prev_pos), prev_pts, prev_duration ).await().as(); @@ -678,7 +687,7 @@ extern "C" { if (remuxObject.first_init) { buffer = randomRead( - static_cast(remuxObject.input_format_context->pb->pos), + to_string(remuxObject.input_format_context->pb->pos), buf_size ) .await() @@ -693,7 +702,7 @@ extern "C" { emscripten::val result = remuxObject .streamRead( - static_cast(remuxObject.input_format_context->pb->pos), + to_string(remuxObject.input_format_context->pb->pos), buf_size ) .await(); diff --git a/src/worker/index.ts b/src/worker/index.ts index 104887f..2437139 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -58,7 +58,8 @@ const init = makeCallListener(async ( const arraybuffer = uint8.buffer.slice(uint8.byteOffset, uint8.byteOffset + uint8.byteLength) attachment(filename, mimetype, arraybuffer) }, - streamRead: async (offset: number) => { + streamRead: async (_offset: string) => { + const offset = Number(_offset) console.log('worker streamRead', offset) const res = await streamRead(offset) console.log('worker streamRead end', offset, res) @@ -68,8 +69,8 @@ const init = makeCallListener(async ( } }, clearStream: () => clearStream(), - randomRead: async (offset: number, bufferSize: number) => { - console.log('worker randomRead', offset, bufferSize) + randomRead: async (_offset: number, bufferSize: number) => { + const offset = Number(_offset) const buffer = await randomRead(offset, bufferSize) return buffer }, @@ -81,9 +82,11 @@ const init = makeCallListener(async ( writeBuffer = newBuffer }, flush: async ( - offset: number, position: number, + _offset: number, _position: number, pts: number, duration: number ) => { + const offset = Number(_offset) + const position = Number(_position) console.log('worker flush', writeBuffer.byteLength) if (!writeBuffer.byteLength) return true readResultPromiseResolve({ From 8758ebf9a74cf16a00f9f4d3357455412525ba7b Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 18:26:27 +0100 Subject: [PATCH 20/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fcleanup=20l?= =?UTF-8?q?ogs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 6 ++---- src/test.ts | 38 ++++++++++++-------------------------- src/worker/index.ts | 6 ------ 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 9786223..b77f4c8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -526,7 +526,7 @@ extern "C" { av_packet_rescale_ts(packet, in_stream->time_base, out_stream->time_base); if (in_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); + // printf("pts: %f, prev duration: %f, duration: %f\n", packet->pts * av_q2d(out_stream->time_base), duration, packet->duration * av_q2d(out_stream->time_base)); duration += packet->duration * av_q2d(out_stream->time_base); } @@ -550,7 +550,7 @@ extern "C" { prev_duration = duration; prev_pts = pts; - printf("pts: %f, duration: %f\n", prev_pts, duration); + // printf("pts: %f, duration: %f\n", prev_pts, duration); prev_pos = pos; duration = 0; @@ -565,7 +565,6 @@ extern "C" { continue; } - printf("flush: %d %d %d\n", is_flushing, flushed, empty_flush); if (is_flushing && empty_flush) { empty_flush = flush( to_string(input_format_context->pb->pos), @@ -731,7 +730,6 @@ extern "C" { // Write callback called by AVIOContext static int writeFunction(void* opaque, uint8_t* buf, int buf_size) { - printf("writeFunction\n"); Remuxer &remuxObject = *reinterpret_cast(opaque); if (remuxObject.initializing && !remuxObject.first_init) { diff --git a/src/test.ts b/src/test.ts index 2599313..31a1d77 100644 --- a/src/test.ts +++ b/src/test.ts @@ -13,8 +13,8 @@ type Chunk = { } const BUFFER_SIZE = 2_500_000 -const VIDEO_URL = '../video2.mkv' -// const VIDEO_URL = '../spidey.mkv' +// const VIDEO_URL = '../video5.mkv' +const VIDEO_URL = '../spidey.mkv' export default async function saveFile(plaintext: ArrayBuffer, fileName: string, fileType: string) { return new Promise((resolve, reject) => { @@ -169,11 +169,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) 'waiting' ] - for (const event of allVideoEvents) { - video.addEventListener(event, ev => { - console.log('video event', event, ev) - }) - } + // for (const event of allVideoEvents) { + // video.addEventListener(event, ev => { + // console.log('video event', event, ev) + // }) + // } const seconds = document.createElement('div') video.controls = true @@ -253,20 +253,11 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) end: sourceBuffer.buffered.end(index) })) - // video.addEventListener('timeupdate', () => { - // seconds.textContent = video.currentTime.toString() - // }) - video.addEventListener('canplaythrough', () => { video.playbackRate = 1 video.play() }, { once: true }) - const logAndAppend = async (chunk: Chunk) => { - console.log('res', chunk) - await appendBuffer(chunk.buffer) - } - let chunks: Chunk[] = [] const PREVIOUS_BUFFER_COUNT = 1 @@ -275,9 +266,7 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(headerChunk.buffer) const pull = async () => { - console.log('read') const chunk = await remuxer.read() - console.log('read', chunk) chunks = [...chunks, chunk] return chunk } @@ -335,12 +324,9 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) await appendBuffer(chunk1.buffer) seeking = false await updateBuffers() - console.log('seek time', performance.now() - p) }) - console.log('pulling first chunk') const firstChunk = await pull() - console.log('first chunk', firstChunk) appendBuffer(firstChunk.buffer) video.addEventListener('timeupdate', () => { @@ -368,10 +354,10 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) setTimeout(async () => { // await video.pause() - video.currentTime = 587.618314 - await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 587.618314 + // await new Promise(resolve => setTimeout(resolve, 1000)) // video.playbackRate = 5 - video.currentTime = 400 + // video.currentTime = 400 // await new Promise(resolve => setTimeout(resolve, 1000)) // video.currentTime = 300 // await new Promise(resolve => setTimeout(resolve, 1000)) @@ -380,8 +366,8 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) // video.currentTime = 600 // await new Promise(resolve => setTimeout(resolve, 1000)) // video.currentTime = 300 - await new Promise(resolve => setTimeout(resolve, 1000)) - video.currentTime = 534.953306 + // await new Promise(resolve => setTimeout(resolve, 1000)) + // video.currentTime = 534.953306 // await new Promise(resolve => setTimeout(resolve, 1000)) // video.currentTime = 100 }, 1000) diff --git a/src/worker/index.ts b/src/worker/index.ts index 2437139..917f8e1 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -60,9 +60,7 @@ const init = makeCallListener(async ( }, streamRead: async (_offset: string) => { const offset = Number(_offset) - console.log('worker streamRead', offset) const res = await streamRead(offset) - console.log('worker streamRead end', offset, res) return { ...res, value: new Uint8Array(res.value) @@ -75,7 +73,6 @@ const init = makeCallListener(async ( return buffer }, write: async (buffer: Uint8Array) => { - console.log('worker write', buffer.byteLength) const newBuffer = new Uint8Array(writeBuffer.byteLength + buffer.byteLength) newBuffer.set(writeBuffer) newBuffer.set(new Uint8Array(buffer), writeBuffer.byteLength) @@ -87,7 +84,6 @@ const init = makeCallListener(async ( ) => { const offset = Number(_offset) const position = Number(_position) - console.log('worker flush', writeBuffer.byteLength) if (!writeBuffer.byteLength) return true readResultPromiseResolve({ isHeader: false, @@ -125,13 +121,11 @@ const init = makeCallListener(async ( }, seek: (timestamp: number) => remuxer.seek(timestamp), read: () => { - console.log('worker read') readResultPromise = new Promise((resolve, reject) => { readResultPromiseResolve = resolve readResultPromiseReject = reject }) remuxer.read() - console.log('worker read end') return readResultPromise }, getInfo: () => remuxer.getInfo() From 146428e105cb12e126cee6dbe262a8bd8e97a5c1 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 18:52:45 +0100 Subject: [PATCH 21/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Fcache=20see?= =?UTF-8?q?k=20fetch=20request?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main.cpp | 26 ++++++++++++++++++++++++++ src/test.ts | 5 ++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index b77f4c8..ce135b5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -92,6 +92,12 @@ extern "C" { vector init_buffers; int init_buffer_count = 0; + bool first_seek = true; + bool seeking = false; + + vector seek_buffers; + int seek_buffer_count = 0; + val promise = val::undefined(); // val promise = val::global("Promise")["resolve"]().as(); // val promise = val::global("Promise").get("resolve").as(); @@ -230,6 +236,7 @@ extern "C" { void init () { initializing = true; init_buffer_count = 0; + seek_buffer_count = 0; int res; is_header = true; duration = 0; @@ -614,6 +621,7 @@ extern "C" { } int _seek(int timestamp) { + seeking = true; destroy(); init(); @@ -629,6 +637,8 @@ extern "C" { if ((res = av_seek_frame(input_format_context, video_stream_index, timestamp, AVSEEK_FLAG_BACKWARD)) < 0) { printf("ERROR: could not seek frame | %s \n", av_err2str(res)); } + seeking = false; + first_seek = false; return 0; } @@ -697,6 +707,22 @@ extern "C" { buffer = remuxObject.init_buffers[remuxObject.init_buffer_count]; remuxObject.init_buffer_count++; } + } else if(remuxObject.seeking) { + emscripten::val &randomRead = remuxObject.randomRead; + if (remuxObject.first_seek) { + buffer = + randomRead( + to_string(remuxObject.input_format_context->pb->pos), + buf_size + ) + .await() + .as(); + remuxObject.seek_buffers.push_back(buffer); + } else { + remuxObject.promise.await(); + buffer = remuxObject.seek_buffers[remuxObject.seek_buffer_count]; + remuxObject.seek_buffer_count++; + } } else { emscripten::val result = remuxObject diff --git a/src/test.ts b/src/test.ts index 31a1d77..b4f93c9 100644 --- a/src/test.ts +++ b/src/test.ts @@ -13,8 +13,8 @@ type Chunk = { } const BUFFER_SIZE = 2_500_000 -// const VIDEO_URL = '../video5.mkv' -const VIDEO_URL = '../spidey.mkv' +const VIDEO_URL = '../video5.mkv' +// const VIDEO_URL = '../spidey.mkv' export default async function saveFile(plaintext: ArrayBuffer, fileName: string, fileType: string) { return new Promise((resolve, reject) => { @@ -314,7 +314,6 @@ fetch(VIDEO_URL, { headers: { Range: `bytes=0-1` } }) }) const seek = queuedDebounceWithLastCall(500, async (seekTime: number) => { - const p = performance.now() seeking = true await appendBuffer(headerChunk.buffer) chunks = [] From 4cfa8f54f871c2e71677d3109b1223c054573183 Mon Sep 17 00:00:00 2001 From: Banou Date: Sun, 28 Jan 2024 19:45:30 +0100 Subject: [PATCH 22/22] =?UTF-8?q?refactor:=20=E2=99=BB=EF=B8=8Ffix=20TS=20?= =?UTF-8?q?typings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/index.ts | 6 +++--- src/worker/index.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index cfdd56f..a7d5366 100644 --- a/src/index.ts +++ b/src/index.ts @@ -96,9 +96,9 @@ export const makeTransmuxer = async ({ let currentStream: ReadableStream | undefined let reader: ReadableStreamDefaultReader | undefined - let streamResultPromiseResolve: (value: ReadableStreamReadResult) => void + let streamResultPromiseResolve: (value: { value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }) => void let streamResultPromiseReject: (reason?: any) => void - let streamResultPromise: Promise> + let streamResultPromise: Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }> const { init: workerInit, destroy: workerDestroy, read: workerRead, seek: workerSeek, getInfo: getInfo } = await target( @@ -155,7 +155,7 @@ export const makeTransmuxer = async ({ reader = currentStream.getReader() } - streamResultPromise = new Promise>((resolve, reject) => { + streamResultPromise = new Promise<{ value: ArrayBuffer | undefined, done: boolean, cancelled: boolean }>((resolve, reject) => { streamResultPromiseResolve = resolve streamResultPromiseReject = reject }) diff --git a/src/worker/index.ts b/src/worker/index.ts index 917f8e1..42ede1c 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -63,7 +63,7 @@ const init = makeCallListener(async ( const res = await streamRead(offset) return { ...res, - value: new Uint8Array(res.value) + value: new Uint8Array(res.value ?? []) } }, clearStream: () => clearStream(),