Skip to content

Commit 707d004

Browse files
authored
Manually mark how much data has been consumed from the source in muxer. (#3720)
1 parent b054da3 commit 707d004

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

src/core/operators/muxer.ml

+5-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class muxer tracks =
8787
let pos, frame =
8888
List.fold_left
8989
(fun (pos, frame) { fields; source } ->
90-
let buf = source#get_frame in
90+
let buf = source#peek_frame in
9191
( min pos (Frame.position buf),
9292
List.fold_left
9393
(fun frame { source_field; target_field; processor } ->
@@ -97,7 +97,10 @@ class muxer tracks =
9797
(length, Frame.create ~length Frame.Fields.empty)
9898
tracks
9999
in
100-
Frame.slice frame pos
100+
let frame = Frame.slice frame pos in
101+
let consumed = Frame.position frame in
102+
List.iter (fun { source } -> source#consumed consumed) tracks;
103+
frame
101104
end
102105

103106
let muxer_operator p =

src/core/source.ml

+1
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ class virtual operator ?pos ?(name = "src") sources =
613613
consumed <- max consumed (Frame.position data);
614614
data
615615

616+
method consumed n = consumed <- max consumed n
616617
method get_frame = self#get_partial_frame (fun f -> f)
617618

618619
method get_mutable_content field =

src/core/source.mli

+10-1
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,21 @@ class virtual source :
258258
that was effectively used. This method is used when a consumer of the source's data
259259
only uses an initial chunk of the frame. In this case, the remaining data is cached
260260
whenever possible and returned during the next streaming cycle. Final returned value
261-
is the same as the partial chunk returned for the callback for easy method call chaining. *)
261+
is the same as the partial chunk returned from the callback for easy method call chaining.
262+
263+
Calling this method is equivalent to doing: {[
264+
let frame = Frame.slice source#peek_frame len in
265+
source#consumed (Frame.position frame);
266+
frame
267+
]} *)
262268
method get_partial_frame : (Frame.t -> Frame.t) -> Frame.t
263269

264270
(** Check a frame without consuming any of its data. *)
265271
method peek_frame : Frame.t
266272

273+
(** Manually mark amount of consumed data from the source. *)
274+
method consumed : int -> unit
275+
267276
(** This method requests a specific field of the frame that can be mutated. It is used
268277
by a consumer of the source that will modify the source's data (e.g. [amplify]). The
269278
source will do its best to minimize data copy according to the streaming context. Typically,

0 commit comments

Comments
 (0)