From 9cee95e7d23145302ee764b74d07ff0311360e60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=BCcke?= <317346+fabianmuecke@users.noreply.github.com> Date: Wed, 17 Jul 2024 11:19:28 +0200 Subject: [PATCH 1/4] Added @MainActor to SharedSequence, Driver and Signal functions taking closure arguments. --- .../Traits/Driver/Driver+Subscription.swift | 10 +++-- .../SharedSequence+Operators.swift | 45 ++++++++++++------- .../Traits/Signal/Signal+Subscription.swift | 10 +++-- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/RxCocoa/Traits/Driver/Driver+Subscription.swift b/RxCocoa/Traits/Driver/Driver+Subscription.swift index 0b9024c75..bab00ec51 100644 --- a/RxCocoa/Traits/Driver/Driver+Subscription.swift +++ b/RxCocoa/Traits/Driver/Driver+Subscription.swift @@ -155,10 +155,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func drive( with object: Object, - onNext: ((Object, Element) -> Void)? = nil, - onCompleted: ((Object) -> Void)? = nil, + onNext: (@MainActor (Object, Element) -> Void)? = nil, + onCompleted: (@MainActor (Object) -> Void)? = nil, onDisposed: ((Object) -> Void)? = nil ) -> Disposable { MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) @@ -178,9 +179,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func drive( - onNext: ((Element) -> Void)? = nil, - onCompleted: (() -> Void)? = nil, + onNext: (@MainActor (Element) -> Void)? = nil, + onCompleted: (@MainActor () -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift index 1d53b03d9..cbfafaae6 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift @@ -17,7 +17,8 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each source element. - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. */ - public func map(_ selector: @escaping (Element) -> Result) -> SharedSequence { + @preconcurrency @MainActor + public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { let source = self .asObservable() .map(selector) @@ -35,7 +36,8 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. */ - public func compactMap(_ selector: @escaping (Element) -> Result?) -> SharedSequence { + @preconcurrency @MainActor + public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { let source = self .asObservable() .compactMap(selector) @@ -51,7 +53,8 @@ extension SharedSequenceConvertibleType { - parameter predicate: A function to test each source element for a condition. - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. */ - public func filter(_ predicate: @escaping (Element) -> Bool) -> SharedSequence { + @preconcurrency @MainActor + public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { let source = self .asObservable() .filter(predicate) @@ -92,7 +95,8 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. */ - public func flatMapLatest(_ selector: @escaping (Element) -> SharedSequence) + @preconcurrency @MainActor + public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -111,7 +115,8 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. */ - public func flatMapFirst(_ selector: @escaping (Element) -> SharedSequence) + @preconcurrency @MainActor + public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -134,7 +139,8 @@ extension SharedSequenceConvertibleType { - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. - returns: The source sequence with the side-effecting behavior applied. */ - public func `do`(onNext: ((Element) -> Void)? = nil, afterNext: ((Element) -> Void)? = nil, onCompleted: (() -> Void)? = nil, afterCompleted: (() -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil) + @preconcurrency @MainActor + public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) -> SharedSequence { let source = self.asObservable() .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) @@ -184,7 +190,8 @@ extension SharedSequenceConvertibleType { - parameter keySelector: A function to compute the comparison key for each element. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. */ - public func distinctUntilChanged(_ keySelector: @escaping (Element) -> Key) -> SharedSequence { + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) return SharedSequence(source) @@ -196,7 +203,8 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. */ - public func distinctUntilChanged(_ comparer: @escaping (Element, Element) -> Bool) -> SharedSequence { + @preconcurrency @MainActor + public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged({ $0 }, comparer: comparer) return SharedSequence(source) @@ -209,7 +217,8 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. */ - public func distinctUntilChanged(_ keySelector: @escaping (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: comparer) return SharedSequence(source) @@ -226,7 +235,8 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each element. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. */ - public func flatMap(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { + @preconcurrency @MainActor + public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source = self.asObservable() .flatMap(selector) @@ -355,7 +365,8 @@ extension SharedSequenceConvertibleType { - parameter accumulator: An accumulator function to be invoked on each element. - returns: An observable sequence containing the accumulated values. */ - public func scan(_ seed: A, accumulator: @escaping (A, Element) -> A) + @preconcurrency @MainActor + public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) -> SharedSequence { let source = self.asObservable() .scan(seed, accumulator: accumulator) @@ -398,7 +409,8 @@ extension SharedSequence { - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - public static func zip(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence + @preconcurrency @MainActor + public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -425,7 +437,8 @@ extension SharedSequence { - parameter resultSelector: Function to invoke whenever any of the sources produces an element. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - public static func combineLatest(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence + @preconcurrency @MainActor + public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -456,9 +469,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. */ + @preconcurrency @MainActor public func withUnretained( _ obj: Object, - resultSelector: @escaping (Object, Element) -> Out + resultSelector: @escaping @MainActor (Object, Element) -> Out ) -> SharedSequence { SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) } @@ -503,7 +517,8 @@ extension SharedSequenceConvertibleType { - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. */ - public func withLatestFrom(_ second: SecondO, resultSelector: @escaping (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + @preconcurrency @MainActor + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { let source = self.asObservable() .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) diff --git a/RxCocoa/Traits/Signal/Signal+Subscription.swift b/RxCocoa/Traits/Signal/Signal+Subscription.swift index 4a6add336..6444ede1f 100644 --- a/RxCocoa/Traits/Signal/Signal+Subscription.swift +++ b/RxCocoa/Traits/Signal/Signal+Subscription.swift @@ -130,10 +130,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func emit( with object: Object, - onNext: ((Object, Element) -> Void)? = nil, - onCompleted: ((Object) -> Void)? = nil, + onNext: (@MainActor (Object, Element) -> Void)? = nil, + onCompleted: (@MainActor (Object) -> Void)? = nil, onDisposed: ((Object) -> Void)? = nil ) -> Disposable { self.asObservable().subscribe( @@ -156,9 +157,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ + @preconcurrency @MainActor public func emit( - onNext: ((Element) -> Void)? = nil, - onCompleted: (() -> Void)? = nil, + onNext: (@MainActor (Element) -> Void)? = nil, + onCompleted: (@MainActor () -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed) From 20a894b91ebf49f2224e9277a6aa289dbee3f3a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=BCcke?= <317346+fabianmuecke@users.noreply.github.com> Date: Mon, 28 Oct 2024 11:59:35 +0100 Subject: [PATCH 2/4] Added MainActorSharingStrategyProtocol to allow shared sequences which are not limited to the main actor. --- Rx.xcodeproj/project.pbxproj | 12 +- RxCocoa/Traits/Driver/Driver.swift | 2 +- .../SharedSequence+Operators+MainActor.swift | 540 ++++++++++++++++++ .../SharedSequence+Operators.swift | 93 +-- .../SharedSequence/SharedSequence.swift | 5 + RxCocoa/Traits/Signal/Signal.swift | 2 +- .../SharedSequence+Operators+MainActor.swift | 1 + Tests/RxCocoaTests/SharedSequence+Test.swift | 83 ++- 8 files changed, 642 insertions(+), 96 deletions(-) create mode 100644 RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift create mode 120000 Sources/RxCocoa/SharedSequence+Operators+MainActor.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a4c8c9fb0..f873c09da 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -651,7 +651,6 @@ C89AB1DA1DAAC3350065FBE6 /* Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B11DAAC3350065FBE6 /* Driver.swift */; }; C89AB1DE1DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */; }; C89AB1EA1DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */; }; - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */; }; C89AB1F61DAAC3350065FBE6 /* SharedSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */; }; C89AB2021DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */; }; C89AB2061DAAC3350065FBE6 /* KVORepresentable+Swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */; }; @@ -789,6 +788,8 @@ CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */; }; CDDEF16A1D4FB40000CA8546 /* Disposables.swift in Sources */ = {isa = PBXBuildFile; fileRef = CDDEF1691D4FB40000CA8546 /* Disposables.swift */; }; + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */; }; + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */; }; D9080ACF1EA05AE0002B433B /* RxNavigationControllerDelegateProxy.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */; }; D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */; }; D9080AD81EA06189002B433B /* UINavigationController+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */; }; @@ -1353,7 +1354,6 @@ C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObservableConvertibleType+Driver.swift"; sourceTree = ""; }; C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+arity.swift"; sourceTree = ""; }; C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = "SharedSequence+Operators+arity.tt"; sourceTree = ""; }; - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SharedSequence.swift; sourceTree = ""; }; C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+CoreGraphics.swift"; sourceTree = ""; }; C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+Swift.swift"; sourceTree = ""; }; @@ -1453,6 +1453,8 @@ CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Infallible+Driver.swift"; sourceTree = ""; }; CDDEF1691D4FB40000CA8546 /* Disposables.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Disposables.swift; sourceTree = ""; }; + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+MainActor.swift"; sourceTree = ""; }; D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxNavigationControllerDelegateProxy.swift; sourceTree = ""; }; D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+Rx.swift"; sourceTree = ""; }; D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+RxTests.swift"; sourceTree = ""; }; @@ -2346,7 +2348,8 @@ children = ( C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */, C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */, - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */, + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */, + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */, DB08833626FB0637005805BE /* SharedSequence+Concurrency.swift */, C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */, C85E6FBD1F53025700C5681E /* SchedulerType+SharedSequence.swift */, @@ -3045,6 +3048,7 @@ B562478F203515DD00D3EE75 /* RxCollectionViewDataSourcePrefetchingProxy.swift in Sources */, 84E4D3921C9AFD3400ADFDC9 /* UISearchController+Rx.swift in Sources */, C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */, + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */, CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */, C89AB1A61DAAC25A0065FBE6 /* RxCocoaObjCRuntimeError+Extensions.swift in Sources */, C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */, @@ -3080,7 +3084,6 @@ C89AB1CA1DAAC3350065FBE6 /* ControlProperty.swift in Sources */, ECBBA59E1DF8C0D400DDDC2E /* RxTabBarControllerDelegateProxy.swift in Sources */, 78F2D93E24C8D35700D13F0C /* RxWKNavigationDelegateProxy.swift in Sources */, - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */, 9BA1CBD31C0F7D550044B50A /* UIActivityIndicatorView+Rx.swift in Sources */, 842A5A2C1C357F92003568D5 /* NSTextStorage+Rx.swift in Sources */, C88254241B8A752B00B02D69 /* RxTextViewDelegateProxy.swift in Sources */, @@ -3098,6 +3101,7 @@ C89AB2501DAAC3A60065FBE6 /* _RXObjCRuntime.m in Sources */, C89AB21E1DAAC3350065FBE6 /* NSObject+Rx.swift in Sources */, D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */, + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */, 88718CFE1CE5D80000D88D60 /* UITabBar+Rx.swift in Sources */, 88D98F2E1CE7549A00D50457 /* RxTabBarDelegateProxy.swift in Sources */, C88254331B8A752B00B02D69 /* UISwitch+Rx.swift in Sources */, diff --git a/RxCocoa/Traits/Driver/Driver.swift b/RxCocoa/Traits/Driver/Driver.swift index 5de8b3a56..0301315d2 100644 --- a/RxCocoa/Traits/Driver/Driver.swift +++ b/RxCocoa/Traits/Driver/Driver.swift @@ -37,7 +37,7 @@ import RxSwift */ public typealias Driver = SharedSequence -public struct DriverSharingStrategy: SharingStrategyProtocol { +public struct DriverSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { source.share(replay: 1, scope: .whileConnected) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift new file mode 100644 index 000000000..e72a4ebe7 --- /dev/null +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift @@ -0,0 +1,540 @@ +// +// SharedSequence+Operators.swift +// RxCocoa +// +// Created by Krunoslav Zaher on 9/19/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +// MARK: map +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into a new form. + + - parameter selector: A transform function to apply to each source element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. + */ + @preconcurrency @MainActor + public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { + let source = self + .asObservable() + .map(selector) + return SharedSequence(source) + } +} + +// MARK: compactMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into an optional form and filters all optional results. + + - parameter selector: A transform function to apply to each source element and which returns an element or nil. + - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. + + */ + @preconcurrency @MainActor + public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { + let source = self + .asObservable() + .compactMap(selector) + return SharedSequence(source) + } +} + +// MARK: filter +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Filters the elements of an observable sequence based on a predicate. + + - parameter predicate: A function to test each source element for a condition. + - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. + */ + @preconcurrency @MainActor + public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { + let source = self + .asObservable() + .filter(predicate) + return SharedSequence(source) + } +} + +// MARK: switchLatest +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Transforms an observable sequence of observable sequences into an observable sequence + producing values only from the most recent observable sequence. + + Each time a new inner observable sequence is received, unsubscribe from the + previous inner observable sequence. + + - returns: The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + public func switchLatest() -> SharedSequence { + let source: Observable = self + .asObservable() + .map { $0.asSharedSequence() } + .switchLatest() + return SharedSequence(source) + } +} + +// MARK: flatMapLatest +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Projects each element of an observable sequence into a new sequence of observable sequences and then + transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. + + It is a combination of `map` + `switchLatest` operator + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an + Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + @preconcurrency @MainActor + public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapLatest(selector) + return SharedSequence(source) + } +} + +// MARK: flatMapFirst +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + If element is received while there is some projected observable sequence being merged it will simply be ignored. + + - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. + */ + @preconcurrency @MainActor + public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapFirst(selector) + return SharedSequence(source) + } +} + +// MARK: do +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Invokes an action for each event in the observable sequence, and propagates all observer messages through the result sequence. + + - parameter onNext: Action to invoke for each element in the observable sequence. + - parameter afterNext: Action to invoke for each element after the observable has passed an onNext event along to its downstream. + - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. + - parameter afterCompleted: Action to invoke after graceful termination of the observable sequence. + - parameter onSubscribe: Action to invoke before subscribing to source observable sequence. + - parameter onSubscribed: Action to invoke after subscribing to source observable sequence. + - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. + - returns: The source sequence with the side-effecting behavior applied. + */ + @preconcurrency @MainActor + public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) + -> SharedSequence { + let source = self.asObservable() + .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) + + return SharedSequence(source) + } +} + +// MARK: debug +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Prints received events for all observers on standard output. + + - parameter identifier: Identifier that is printed together with event description to standard output. + - returns: An observable sequence whose events are printed to standard output. + */ + public func debug(_ identifier: String? = nil, trimOutput: Bool = false, file: String = #file, line: UInt = #line, function: String = #function) -> SharedSequence { + let source = self.asObservable() + .debug(identifier, trimOutput: trimOutput, file: file, line: line, function: function) + return SharedSequence(source) + } +} + +// MARK: distinctUntilChanged +extension SharedSequenceConvertibleType where Element: Equatable, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to equality operator. + + - returns: An observable sequence only containing the distinct contiguous elements, based on equality operator, from the source sequence. + */ + public func distinctUntilChanged() + -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: { ($0 == $1) }) + + return SharedSequence(source) + } +} + +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `keySelector`. + + - parameter keySelector: A function to compute the comparison key for each element. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `comparer`. + + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: comparer) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the keySelector and the comparer. + + - parameter keySelector: A function to compute the comparison key for each element. + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: comparer) + return SharedSequence(source) + } +} + + +// MARK: flatMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + */ + @preconcurrency @MainActor + public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { + let source = self.asObservable() + .flatMap(selector) + + return SharedSequence(source) + } +} + +// MARK: merge +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences from collection into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences from array into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Array of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: [SharedSequence]) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: SharedSequence...) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + +} + +// MARK: merge +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence. + + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public func merge() -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge() + return SharedSequence(source) + } + + /** + Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences. + + - parameter maxConcurrent: Maximum number of inner observable sequences being subscribed to concurrently. + - returns: The observable sequence that merges the elements of the inner sequences. + */ + public func merge(maxConcurrent: Int) + -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge(maxConcurrent: maxConcurrent) + return SharedSequence(source) + } +} + +// MARK: throttle +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration. + + This operator makes sure that no two elements are emitted in less then dueTime. + + - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) + + - parameter dueTime: Throttling duration for each element. + - parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted. + - returns: The throttled sequence. + */ + public func throttle(_ dueTime: RxTimeInterval, latest: Bool = true) + -> SharedSequence { + let source = self.asObservable() + .throttle(dueTime, latest: latest, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } + + /** + Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers. + + - parameter dueTime: Throttling duration for each element. + - returns: The throttled sequence. + */ + public func debounce(_ dueTime: RxTimeInterval) + -> SharedSequence { + let source = self.asObservable() + .debounce(dueTime, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } +} + +// MARK: scan +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value. + + For aggregation behavior with no intermediate results, see `reduce`. + + - parameter seed: The initial accumulator value. + - parameter accumulator: An accumulator function to be invoked on each element. + - returns: An observable sequence containing the accumulated values. + */ + @preconcurrency @MainActor + public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) + -> SharedSequence { + let source = self.asObservable() + .scan(seed, accumulator: accumulator) + return SharedSequence(source) + } +} + +// MARK: concat + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ sequence: Sequence) -> SharedSequence + where Sequence.Element == SharedSequence { + let source = Observable.concat(sequence.lazy.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.concat(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: zip + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. + + - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency @MainActor + public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence all of the observable sequences have produced an element at a corresponding index. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func zip(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }) + return SharedSequence(source) + } +} + +// MARK: combineLatest + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element. + + - parameter resultSelector: Function to invoke whenever any of the sources produces an element. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency @MainActor + public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence whenever any of the observable sequences produces an element. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func combineLatest(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: - withUnretained +extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. + - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. + */ + @preconcurrency @MainActor + public func withUnretained( + _ obj: Object, + resultSelector: @escaping @MainActor (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. + */ + public func withUnretained(_ obj: Object) -> SharedSequence { + withUnretained(obj) { ($0, $1) } + } +} + +extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained( + _ obj: Object, + resultSelector: @escaping (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained(_ obj: Object) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) + } +} + +// MARK: withLatestFrom +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Merges two observable sequences into one observable sequence by combining each element from self with the latest element from the second source, if any. + + - parameter second: Second observable source. + - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + @preconcurrency @MainActor + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) + + return SharedSequence(source) + } + + /** + Merges two observable sequences into one observable sequence by using latest element from the second sequence every time when `self` emits an element. + + - parameter second: Second observable source. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + public func withLatestFrom(_ second: SecondO) -> SharedSequence { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence()) + + return SharedSequence(source) + } +} diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift index cbfafaae6..7a207826f 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift @@ -17,8 +17,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each source element. - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. */ - @preconcurrency @MainActor - public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { + public func map(_ selector: @escaping (Element) -> Result) -> SharedSequence { let source = self .asObservable() .map(selector) @@ -36,8 +35,7 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. */ - @preconcurrency @MainActor - public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { + public func compactMap(_ selector: @escaping (Element) -> Result?) -> SharedSequence { let source = self .asObservable() .compactMap(selector) @@ -53,8 +51,7 @@ extension SharedSequenceConvertibleType { - parameter predicate: A function to test each source element for a condition. - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. */ - @preconcurrency @MainActor - public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { + public func filter(_ predicate: @escaping (Element) -> Bool) -> SharedSequence { let source = self .asObservable() .filter(predicate) @@ -95,8 +92,7 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. */ - @preconcurrency @MainActor - public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) + public func flatMapLatest(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -115,8 +111,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. */ - @preconcurrency @MainActor - public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) + public func flatMapFirst(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -139,8 +134,7 @@ extension SharedSequenceConvertibleType { - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. - returns: The source sequence with the side-effecting behavior applied. */ - @preconcurrency @MainActor - public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) + public func `do`(onNext: ((Element) -> Void)? = nil, afterNext: ((Element) -> Void)? = nil, onCompleted: (() -> Void)? = nil, afterCompleted: (() -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil) -> SharedSequence { let source = self.asObservable() .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) @@ -190,8 +184,7 @@ extension SharedSequenceConvertibleType { - parameter keySelector: A function to compute the comparison key for each element. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { + public func distinctUntilChanged(_ keySelector: @escaping (Element) -> Key) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) return SharedSequence(source) @@ -203,8 +196,7 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { + public func distinctUntilChanged(_ comparer: @escaping (Element, Element) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged({ $0 }, comparer: comparer) return SharedSequence(source) @@ -217,8 +209,7 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + public func distinctUntilChanged(_ keySelector: @escaping (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: comparer) return SharedSequence(source) @@ -235,8 +226,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each element. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. */ - @preconcurrency @MainActor - public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { + public func flatMap(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source = self.asObservable() .flatMap(selector) @@ -365,8 +355,7 @@ extension SharedSequenceConvertibleType { - parameter accumulator: An accumulator function to be invoked on each element. - returns: An observable sequence containing the accumulated values. */ - @preconcurrency @MainActor - public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) + public func scan(_ seed: A, accumulator: @escaping (A, Element) -> A) -> SharedSequence { let source = self.asObservable() .scan(seed, accumulator: accumulator) @@ -409,8 +398,7 @@ extension SharedSequence { - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor - public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + public static func zip(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -437,8 +425,7 @@ extension SharedSequence { - parameter resultSelector: Function to invoke whenever any of the sources produces an element. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor - public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + public static func combineLatest(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -456,57 +443,6 @@ extension SharedSequence { } } -// MARK: - withUnretained -extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. - - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. - */ - @preconcurrency @MainActor - public func withUnretained( - _ obj: Object, - resultSelector: @escaping @MainActor (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. - */ - public func withUnretained(_ obj: Object) -> SharedSequence { - withUnretained(obj) { ($0, $1) } - } -} - -extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained( - _ obj: Object, - resultSelector: @escaping (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained(_ obj: Object) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) - } -} - // MARK: withLatestFrom extension SharedSequenceConvertibleType { @@ -517,8 +453,7 @@ extension SharedSequenceConvertibleType { - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. */ - @preconcurrency @MainActor - public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { let source = self.asObservable() .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence.swift b/RxCocoa/Traits/SharedSequence/SharedSequence.swift index 4596c8ec0..b86ff97c1 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence.swift @@ -81,6 +81,11 @@ public protocol SharingStrategyProtocol { static func share(_ source: Observable) -> Observable } +/** + A marker protocol for all sharing strategies, which are guaranteed to run on the main thread. + */ +public protocol MainActorSharingStrategyProtocol: SharingStrategyProtocol {} + /** A type that can be converted to `SharedSequence`. */ diff --git a/RxCocoa/Traits/Signal/Signal.swift b/RxCocoa/Traits/Signal/Signal.swift index e066b7ec2..db2153241 100644 --- a/RxCocoa/Traits/Signal/Signal.swift +++ b/RxCocoa/Traits/Signal/Signal.swift @@ -29,7 +29,7 @@ import RxSwift */ public typealias Signal = SharedSequence -public struct SignalSharingStrategy: SharingStrategyProtocol { +public struct SignalSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { diff --git a/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift new file mode 120000 index 000000000..49285c88e --- /dev/null +++ b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift @@ -0,0 +1 @@ +../../RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift \ No newline at end of file diff --git a/Tests/RxCocoaTests/SharedSequence+Test.swift b/Tests/RxCocoaTests/SharedSequence+Test.swift index e2406f477..afd3095bd 100644 --- a/Tests/RxCocoaTests/SharedSequence+Test.swift +++ b/Tests/RxCocoaTests/SharedSequence+Test.swift @@ -7,10 +7,10 @@ // import Dispatch -import RxSwift import RxCocoa -import XCTest +import RxSwift import RxTest +import XCTest class SharedSequenceTest: RxTest { var backgroundScheduler = SerialDispatchQueueScheduler(qos: .default) @@ -26,11 +26,15 @@ class SharedSequenceTest: RxTest { // * events are observed on main thread - observe(on:MainScheduler.instance) // * it can't error out - it needs to have catch somewhere extension SharedSequenceTest { - func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(_ xs: SharedSequence, expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, subscribedOnBackground: () -> Void) -> [Result] { + func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription( + _ xs: SharedSequence, + expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, + subscribedOnBackground: () -> Void + ) -> [Result] { var firstElements = [Result]() var secondElements = [Result]() - let subscribeFinished = self.expectation(description: "subscribeFinished") + let subscribeFinished = expectation(description: "subscribeFinished") var expectation1: XCTestExpectation! var expectation2: XCTestExpectation! @@ -43,13 +47,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case .next(let element): + case let .next(element): firstElements.append(element) if expectationFulfilled(element) { expectation1.fulfill() firstSubscriptionFuture.dispose() } - case .error(let error): + case let .error(error): XCTFail("Error passed \(error)") case .completed: expectation1.fulfill() @@ -65,13 +69,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case .next(let element): + case let .next(element): secondElements.append(element) if expectationFulfilled(element) { expectation2.fulfill() secondSubscriptionFuture.dispose() } - case .error(let error): + case let .error(error): XCTFail("Error passed \(error)") case .completed: expectation2.fulfill() @@ -96,15 +100,72 @@ extension SharedSequenceTest { XCTAssertTrue(error == nil) } - expectation1 = self.expectation(description: "finished1") - expectation2 = self.expectation(description: "finished2") + expectation1 = expectation(description: "finished1") + expectation2 = expectation(description: "finished2") subscribedOnBackground() waitForExpectations(timeout: 1.0) { error in XCTAssertTrue(error == nil) } - + return firstElements } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testDriverWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asDriver(onErrorDriveWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testSignalWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asSignal(onErrorSignalWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testBackgroundSharingSequence() async { + func testBackgroundSharingSequence() async { + for await value in await Observable.just(1) + .asSharedSequence( + sharingStrategy: BackgroundSharingStrategy.self, + onErrorRecover: { _ in .empty() }) + .map({ one in + if Thread.isMainThread { + return 0 + } + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + } } + +private struct BackgroundSharingStrategy: SharingStrategyProtocol { + public static var scheduler: SchedulerType { ConcurrentDispatchQueueScheduler(qos: .default) } + + public static func share(_ source: Observable) -> Observable { + source.share(scope: .whileConnected) + } +} + +private typealias TestSequence = SharedSequence From c964cffff405d58b54f585af13f6cb28689d88b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=BCcke?= <317346+fabianmuecke@users.noreply.github.com> Date: Mon, 28 Oct 2024 12:04:42 +0100 Subject: [PATCH 3/4] Revert accidental file reformat. --- Tests/RxCocoaTests/SharedSequence+Test.swift | 26 +++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/Tests/RxCocoaTests/SharedSequence+Test.swift b/Tests/RxCocoaTests/SharedSequence+Test.swift index afd3095bd..74aa6cbd4 100644 --- a/Tests/RxCocoaTests/SharedSequence+Test.swift +++ b/Tests/RxCocoaTests/SharedSequence+Test.swift @@ -7,10 +7,10 @@ // import Dispatch -import RxCocoa import RxSwift -import RxTest +import RxCocoa import XCTest +import RxTest class SharedSequenceTest: RxTest { var backgroundScheduler = SerialDispatchQueueScheduler(qos: .default) @@ -26,15 +26,11 @@ class SharedSequenceTest: RxTest { // * events are observed on main thread - observe(on:MainScheduler.instance) // * it can't error out - it needs to have catch somewhere extension SharedSequenceTest { - func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription( - _ xs: SharedSequence, - expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, - subscribedOnBackground: () -> Void - ) -> [Result] { + func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(_ xs: SharedSequence, expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, subscribedOnBackground: () -> Void) -> [Result] { var firstElements = [Result]() var secondElements = [Result]() - let subscribeFinished = expectation(description: "subscribeFinished") + let subscribeFinished = self.expectation(description: "subscribeFinished") var expectation1: XCTestExpectation! var expectation2: XCTestExpectation! @@ -47,13 +43,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case let .next(element): + case .next(let element): firstElements.append(element) if expectationFulfilled(element) { expectation1.fulfill() firstSubscriptionFuture.dispose() } - case let .error(error): + case .error(let error): XCTFail("Error passed \(error)") case .completed: expectation1.fulfill() @@ -69,13 +65,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case let .next(element): + case .next(let element): secondElements.append(element) if expectationFulfilled(element) { expectation2.fulfill() secondSubscriptionFuture.dispose() } - case let .error(error): + case .error(let error): XCTFail("Error passed \(error)") case .completed: expectation2.fulfill() @@ -100,15 +96,15 @@ extension SharedSequenceTest { XCTAssertTrue(error == nil) } - expectation1 = expectation(description: "finished1") - expectation2 = expectation(description: "finished2") + expectation1 = self.expectation(description: "finished1") + expectation2 = self.expectation(description: "finished2") subscribedOnBackground() waitForExpectations(timeout: 1.0) { error in XCTAssertTrue(error == nil) } - + return firstElements } From 3d8e78dc88417c677f8df8a918f527665e9ba09c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=BCcke?= <317346+fabianmuecke@users.noreply.github.com> Date: Mon, 28 Oct 2024 14:18:49 +0100 Subject: [PATCH 4/4] Removed @MainActor from functions where it's only needed for the closure. --- .../SharedSequence+Operators+MainActor.swift | 30 +++++++++---------- .../Traits/Signal/Signal+Subscription.swift | 4 +-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift index e72a4ebe7..42bf4e680 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift @@ -17,7 +17,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter selector: A transform function to apply to each source element. - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. */ - @preconcurrency @MainActor + @preconcurrency public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { let source = self .asObservable() @@ -36,7 +36,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. */ - @preconcurrency @MainActor + @preconcurrency public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { let source = self .asObservable() @@ -53,7 +53,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter predicate: A function to test each source element for a condition. - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. */ - @preconcurrency @MainActor + @preconcurrency public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { let source = self .asObservable() @@ -95,7 +95,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. */ - @preconcurrency @MainActor + @preconcurrency public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self @@ -115,7 +115,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. */ - @preconcurrency @MainActor + @preconcurrency public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self @@ -139,7 +139,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. - returns: The source sequence with the side-effecting behavior applied. */ - @preconcurrency @MainActor + @preconcurrency public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) -> SharedSequence { let source = self.asObservable() @@ -190,7 +190,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter keySelector: A function to compute the comparison key for each element. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. */ - @preconcurrency @MainActor + @preconcurrency public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) @@ -203,7 +203,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. */ - @preconcurrency @MainActor + @preconcurrency public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged({ $0 }, comparer: comparer) @@ -217,7 +217,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. */ - @preconcurrency @MainActor + @preconcurrency public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: comparer) @@ -235,7 +235,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter selector: A transform function to apply to each element. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. */ - @preconcurrency @MainActor + @preconcurrency public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { let source = self.asObservable() .flatMap(selector) @@ -365,7 +365,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter accumulator: An accumulator function to be invoked on each element. - returns: An observable sequence containing the accumulated values. */ - @preconcurrency @MainActor + @preconcurrency public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) -> SharedSequence { let source = self.asObservable() @@ -409,7 +409,7 @@ extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor + @preconcurrency public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) @@ -437,7 +437,7 @@ extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol - parameter resultSelector: Function to invoke whenever any of the sources produces an element. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor + @preconcurrency public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) @@ -469,7 +469,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. */ - @preconcurrency @MainActor + @preconcurrency public func withUnretained( _ obj: Object, resultSelector: @escaping @MainActor (Object, Element) -> Out @@ -517,7 +517,7 @@ extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingS - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. */ - @preconcurrency @MainActor + @preconcurrency public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { let source = self.asObservable() .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) diff --git a/RxCocoa/Traits/Signal/Signal+Subscription.swift b/RxCocoa/Traits/Signal/Signal+Subscription.swift index 6444ede1f..bb4f23f2d 100644 --- a/RxCocoa/Traits/Signal/Signal+Subscription.swift +++ b/RxCocoa/Traits/Signal/Signal+Subscription.swift @@ -130,7 +130,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ - @preconcurrency @MainActor + @preconcurrency public func emit( with object: Object, onNext: (@MainActor (Object, Element) -> Void)? = nil, @@ -157,7 +157,7 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt gracefully completed, errored, or if the generation is canceled by disposing subscription) - returns: Subscription object used to unsubscribe from the observable sequence. */ - @preconcurrency @MainActor + @preconcurrency public func emit( onNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil,