Skip to content
10 changes: 9 additions & 1 deletion Rx.NET/Documentation/ReleaseHistory/Rx.v6.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# Rx Release History v6.0
# Rx Release History v6.0

## 6.1.0

This release adds:

* A `DisposeWith`extension method for `IDisposable` to simplify disposal in conjunction with `CompositeDisposable` (see [#2178](https://github.com/dotnet/reactive/pull/2178) thanks to [Chris Pulman](https://github.com/ChrisPulman)
* A new overload of `TakeUntil` accepting a `CancellationToken` (see [#2181](https://github.com/dotnet/reactive/issues/2181) thanks to [Nils Aufschläger](https://github.com/nilsauf)


## v6.0.2

Expand Down
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ internal partial interface IQueryLanguage
IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken);
IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);
Expand Down
22 changes: 20 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Configuration;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -865,7 +864,7 @@ public static IObservable<TSource> TakeUntil<TSource, TOther>(this IObservable<T
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
if (source == null)
Expand All @@ -881,6 +880,25 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
return s_impl.TakeUntil(source, stopPredicate);
}

/// <summary>
/// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken"/> is cancelled.
/// Completes immediately if the provided <paramref name="cancellationToken"/> is already cancelled upon subscription.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
/// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken"/> is cancelled.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken cancellationToken)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

return s_impl.TakeUntil(source, cancellationToken);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
/// <summary>
/// Relays items to the downstream until the CancellationToken is cancelled.
/// </summary>
/// <typeparam name="TSource">The element type of the sequence</typeparam>
internal sealed class TakeUntilCancellationToken<TSource> :
Producer<TSource, TakeUntilCancellationToken<TSource>._>
{
private readonly IObservable<TSource> _source;
private readonly CancellationToken _token;

public TakeUntilCancellationToken(IObservable<TSource> source, CancellationToken token)
{
_source = source;
_token = token;
}

protected override _ CreateSink(IObserver<TSource> observer) => new(observer);

protected override void Run(_ sink) => sink.Run(this);

internal sealed class _ : IdentitySink<TSource>
{
private SingleAssignmentDisposableValue _cancellationTokenRegistration;
private int _wip;
private Exception? _error;

public _(IObserver<TSource> observer) : base(observer)
{
}

public void Run(TakeUntilCancellationToken<TSource> parent)
{
if (parent._token.IsCancellationRequested)
{
OnCompleted();
return;
}

_cancellationTokenRegistration.Disposable = parent._token.Register(OnCompleted);
Run(parent._source);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_cancellationTokenRegistration.Dispose();
}
base.Dispose(disposing);
}

public override void OnNext(TSource value)
{
HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
}

public override void OnError(Exception error)
{
HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
}

public override void OnCompleted()
{
HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
}
}
}
}
27 changes: 26 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13761,6 +13761,31 @@ public static IQbservable<IList<TSource>> TakeLastBuffer<TSource>(this IQbservab
);
}

/// <summary>
/// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken" /> is cancelled.
/// Completes immediately if the provided <paramref name="cancellationToken" /> is already cancelled upon subscription.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
/// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken" /> is cancelled.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TSource)),
source.Expression,
Expression.Constant(cancellationToken, typeof(CancellationToken))
)
);
}


/// <summary>
/// Takes elements for the specified duration until the specified end time.
/// </summary>
Expand Down Expand Up @@ -13858,7 +13883,7 @@ public static IQbservable<TSource> TakeUntil<TSource, TOther>(this IQbservable<T
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
{
if (source == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace System.Reactive.Linq
Expand Down Expand Up @@ -282,6 +283,11 @@ public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> sour
return new TakeUntilPredicate<TSource>(source, stopPredicate);
}

public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
{
return new TakeUntilCancellationToken<TSource>(source, cancellationToken);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,7 @@ public static System.IObservable<TSource> TakeLast<TSource>(this System.IObserva
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
Expand Down Expand Up @@ -2296,6 +2297,7 @@ public static System.Reactive.Linq.IQbservable<TSource> TakeLast<TSource>(this S
public static System.Reactive.Linq.IQbservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,5 +860,64 @@ public void TakeUntil_Predicate_Crash()

#endregion

#region + CancellationToken +

[TestMethod]
public void TakeUntil_CancellationToken_BasicCancelation()
{
var scheduler = new TestScheduler();
var tokenSource = new CancellationTokenSource();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnCompleted<int>(260)
);

scheduler.ScheduleAbsolute(235, () => tokenSource.Cancel());

var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnCompleted<int>(235)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 235)
);
}

[TestMethod]
public void TakeUntil_CancellationToken_AlreadyCanceled()
{
var scheduler = new TestScheduler();
var tokenSource = new CancellationTokenSource();
tokenSource.Cancel();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnCompleted<int>(260)
);

var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));

result.Messages.AssertEqual(
OnCompleted<int>(200)
);

Assert.Empty(source.Subscriptions);
}

#endregion
}
}