From 93f29d302ea8c76eb91dbe9f905ec364168adfcf Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Fri, 20 Jan 2023 15:04:05 +0700 Subject: [PATCH] fix(throttle): fix issue #709, where throttled Stream does not emit done event. --- .../backpressure/backpressure.dart | 84 ++++++++++++------- .../backpressure/throttle_time_test.dart | 29 +++++++ 2 files changed, 81 insertions(+), 32 deletions(-) diff --git a/lib/src/transformers/backpressure/backpressure.dart b/lib/src/transformers/backpressure/backpressure.dart index b8cba2726..5c523728f 100644 --- a/lib/src/transformers/backpressure/backpressure.dart +++ b/lib/src/transformers/backpressure/backpressure.dart @@ -54,7 +54,7 @@ class _BackpressureStreamSink extends ForwardingSink { @override void onData(S data) { _hasData = true; - maybeCreateWindow(data, sink); + maybeCreateWindow(data); if (skip == 0) { queue.add(data); @@ -68,7 +68,7 @@ class _BackpressureStreamSink extends ForwardingSink { skip--; } - maybeCloseWindow(sink); + maybeCloseWindow(); } @override @@ -79,20 +79,27 @@ class _BackpressureStreamSink extends ForwardingSink { _mainClosed = true; if (_strategy == WindowStrategy.eventAfterLastWindow) { + resolveWindowEnd(isControllerClosing: true, isWindowClosed: false); return; } // treat the final event as a Window that opens // and immediately closes again if (_dispatchOnClose && queue.isNotEmpty) { - resolveWindowStart(queue.last, sink); + resolveWindowStart(queue.last); } - resolveWindowEnd(sink, true); + resolveWindowEnd(isControllerClosing: true, isWindowClosed: false); + clearAndClose(); + } + + void clearAndClose() { queue.clear(); _windowSubscription?.cancel(); + _windowSubscription = null; + sink.close(); } @@ -108,33 +115,33 @@ class _BackpressureStreamSink extends ForwardingSink { @override void onResume() => _windowSubscription?.resume(); - void maybeCreateWindow(S event, EventSink sink) { + void maybeCreateWindow(S event) { switch (_strategy) { // for example throttle case WindowStrategy.eventAfterLastWindow: if (_windowSubscription != null) return; - _windowSubscription = singleWindow(event, sink); + _windowSubscription = singleWindow(event); - resolveWindowStart(event, sink); + resolveWindowStart(event); break; // for example scan case WindowStrategy.firstEventOnly: if (_windowSubscription != null) return; - _windowSubscription = multiWindow(event, sink); + _windowSubscription = multiWindow(event); - resolveWindowStart(event, sink); + resolveWindowStart(event); break; // for example debounce case WindowStrategy.everyEvent: _windowSubscription?.cancel(); - _windowSubscription = singleWindow(event, sink); + _windowSubscription = singleWindow(event); - resolveWindowStart(event, sink); + resolveWindowStart(event); break; case WindowStrategy.onHandler: @@ -142,29 +149,31 @@ class _BackpressureStreamSink extends ForwardingSink { } } - void maybeCloseWindow(EventSink sink) { + void maybeCloseWindow() { if (_closeWindowWhen != null && _closeWindowWhen!(unmodifiableQueue)) { - resolveWindowEnd(sink); + resolveWindowEnd(isControllerClosing: false, isWindowClosed: false); } } - StreamSubscription singleWindow(S event, EventSink sink) => - buildStream(event, sink).take(1).listen( + StreamSubscription singleWindow(S event) => + buildStream(event).take(1).listen( null, onError: sink.addError, - onDone: () => resolveWindowEnd(sink, _mainClosed), + onDone: () => resolveWindowEnd( + isControllerClosing: _mainClosed, isWindowClosed: true), ); // opens a new Window which is kept open until the main Stream // closes. - StreamSubscription multiWindow(S event, EventSink sink) => - buildStream(event, sink).listen( - (dynamic _) => resolveWindowEnd(sink), + StreamSubscription multiWindow(S event) => buildStream(event).listen( + (dynamic _) => resolveWindowEnd( + isControllerClosing: _mainClosed, isWindowClosed: false), onError: sink.addError, - onDone: () => resolveWindowEnd(sink), + onDone: () => resolveWindowEnd( + isControllerClosing: _mainClosed, isWindowClosed: true), ); - Stream buildStream(S event, EventSink sink) { + Stream buildStream(S event) { Stream stream; _windowSubscription?.cancel(); @@ -174,27 +183,38 @@ class _BackpressureStreamSink extends ForwardingSink { return stream; } - void resolveWindowStart(S event, EventSink sink) { + void resolveWindowStart(S event) { if (_onWindowStart != null) { sink.add(_onWindowStart!(event)); } } - void resolveWindowEnd(EventSink sink, [bool isControllerClosing = false]) { + void resolveWindowEnd({ + required bool isControllerClosing, + required bool isWindowClosed, + }) { if (isControllerClosing && _strategy == WindowStrategy.eventAfterLastWindow) { - if (_dispatchOnClose && - _hasData && - queue.length > 1 && - _onWindowEnd != null) { - sink.add(_onWindowEnd!(unmodifiableQueue)); + // has no last data, close immediately + if (!_hasData || queue.length == 1) { + clearAndClose(); + return; } - queue.clear(); - _windowSubscription?.cancel(); - _windowSubscription = null; + // once the Stream has emitted done event, there may still be a pending data + // waiting to be emitted. If so, wait for the window to end and then + // emit it. + if (!isWindowClosed) { + // defer until the window closes + return; + } + + // send the last event + if (_dispatchOnClose && _onWindowEnd != null) { + sink.add(_onWindowEnd!(unmodifiableQueue)); + } - sink.close(); + clearAndClose(); return; } diff --git a/test/transformers/backpressure/throttle_time_test.dart b/test/transformers/backpressure/throttle_time_test.dart index b84dc119c..563a81746 100644 --- a/test/transformers/backpressure/throttle_time_test.dart +++ b/test/transformers/backpressure/throttle_time_test.dart @@ -15,6 +15,26 @@ void main() { emitsInOrder([1, 4, 7, emitsDone])); }); + test('Rx.throttleTime.trailing.empty', () async { + await expectLater( + Stream.empty().throttleTime( + const Duration(milliseconds: 250), + leading: false, + trailing: true, + ), + emitsDone, + ); + + await expectLater( + Stream.empty().throttleTime( + const Duration(milliseconds: 250), + leading: true, + trailing: true, + ), + emitsDone, + ); + }); + test('Rx.throttleTime.trailing', () async { await expectLater( _stream() @@ -99,4 +119,13 @@ void main() { (s) => s.throttleTime(Duration.zero), ); }); + + test('issue/709 throttled stream closes', () async { + final c = StreamController(); + unawaited(Future.delayed(Duration(milliseconds: 500)) + .then((f) => c.close())); + + final s = c.stream.throttleTime(Duration(milliseconds: 100)); + await for (var _ in s) {} + }); }