From a0c3a8ce2d14fd7adee926e2313c579738028966 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Mon, 13 Nov 2023 23:45:16 +0100 Subject: [PATCH 1/5] Implement merge_each --- include/exec/sequence/merge_each.hpp | 633 +++++++++++++++++++++++++ test/CMakeLists.txt | 1 + test/exec/sequence/test_merge_each.cpp | 33 ++ 3 files changed, 667 insertions(+) create mode 100644 include/exec/sequence/merge_each.hpp create mode 100644 test/exec/sequence/test_merge_each.cpp diff --git a/include/exec/sequence/merge_each.hpp b/include/exec/sequence/merge_each.hpp new file mode 100644 index 000000000..59347e88d --- /dev/null +++ b/include/exec/sequence/merge_each.hpp @@ -0,0 +1,633 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../env.hpp" +#include "../sequence_senders.hpp" + +#include + +namespace exec { + namespace __merge_each { + using namespace stdexec; + + struct __default_stop_callback { + stdexec::in_place_stop_source& __stop_source_; + + void operator()() const noexcept { + __stop_source_.request_stop(); + } + }; + + template + struct __error_visitor { + _Receiver& __receiver_; + + template + void operator()(_Error&& __error) const noexcept { + if constexpr (__not_decays_to<_Error, std::monostate>) { + stdexec::set_error(static_cast<_Receiver&&>(__receiver_), static_cast<_Error&&>(__error)); + } + } + }; + + template + struct __operation_base { + using __stop_token_t = stop_token_of_t>; + using __default_stop_callback_t = + typename __stop_token_t::template callback_type<__default_stop_callback>; + + void __notify_completion() noexcept { + if (__n_pending_ops_.fetch_sub(1, std::memory_order_relaxed) == 1) { + __on_receiver_stopped_.reset(); + int __error_emplaced = __error_emplaced_.load(std::memory_order_acquire); + if (__error_emplaced == 2) { + std::visit( + __error_visitor<_Receiver>{__receiver_}, static_cast<_ErrorsVariant&&>(__errors_)); + } else { + exec::__set_value_unless_stopped(static_cast<_Receiver&&>(__receiver_)); + } + } + } + + template + void __emplace_error(Error&& error) noexcept { + int __expected = 0; + if (__error_emplaced_.compare_exchange_strong(__expected, 1, std::memory_order_relaxed)) { + __errors_.template emplace<__decay_t>(static_cast(error)); + __error_emplaced_.store(2, std::memory_order_release); + } + } + + std::atomic __n_pending_ops_; + _Receiver __receiver_; + _ErrorsVariant __errors_{}; + std::atomic __error_emplaced_{0}; + in_place_stop_source __stop_source_{}; + std::optional<__default_stop_callback_t> __on_receiver_stopped_{}; + }; + + template + struct __next_receiver { + class __t; + }; + + template + concept __emplaceable = requires(_Variant&& __variant, _Args&&... __args) { + { __variant.template emplace<_Tp>(static_cast<_Args&&>(__args)...) }; + }; + + template + class __next_receiver<_ReceiverId, _ErrorsVariant>::__t { + using _Receiver = stdexec::__t<_ReceiverId>; + public: + using __id = __next_receiver; + using is_receiver = void; + + explicit __t(__operation_base<_Receiver, _ErrorsVariant>* __parent) noexcept + : __op_{__parent} { + } + + private: + __operation_base<_Receiver, _ErrorsVariant>* __op_; + + template _SetNext, same_as<__t> _Self, class _Item> + requires __callable<_SetNext, _Receiver&, _Item> + friend auto tag_invoke(_SetNext, _Self& __self, _Item&& __item) noexcept( + __nothrow_callable<_SetNext, _Receiver&, _Item>) -> next_sender_of_t<_Receiver&, _Item> { + return exec::set_next(__self.__op_->__receiver_, static_cast<_Item&&>(__item)); + } + + template _SetValue, same_as<__t> _Self> + friend void tag_invoke(_SetValue, _Self&& __self) noexcept { + __self.__op_->__notify_completion(); + } + + template _SetStopped, same_as<__t> _Self> + friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __self.__op_->__notify_completion(); + } + + template _SetError, same_as<__t> _Self, class _Error> + requires __callable // + && __emplaceable<_ErrorsVariant, __decay_t<_Error>, _Error> + friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __self.__op_->__emplace_error(static_cast<_Error&&>(__error)); + __self.__op_->__stop_source_.request_stop(); + __self.__op_->__notify_completion(); + } + + template _GetEnv, __decays_to<__t> _Self> + friend auto tag_invoke(_GetEnv, _Self&& __self) noexcept + -> make_env_t, with_t> { + return exec::make_env( + stdexec::get_env(__self.__op_->__receiver_), + exec::with(get_stop_token, __self.__op_->__stop_source_.get_token())); + } + }; + + template + struct __traits { + using __env = env_of_t<_Receiver>; + + using __errors_variant = // + __minvoke<__mconcat<__nullable_variant_t>, error_types_of_t<_Senders, __env, __types>...>; + + using __next_receiver_t = __t<__next_receiver<__id<_Receiver>, __errors_variant>>; + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + using _Base = + __operation_base<_Receiver, typename __traits<_Receiver, _Senders...>::__errors_variant>; + class __t; + }; + + template + class __operation<_ReceiverId, _Senders...>::__t + : __operation<_ReceiverId, _Senders...>::_Base { + using __next_receiver_t = typename __traits<_Receiver, _Senders...>::__next_receiver_t; + std::tuple...> __ops_; + + public: + __t(_Receiver __rcvr, _Senders&&... __sndrs) + : _Base{sizeof...(_Senders), static_cast<_Receiver&&>(__rcvr)} + , __ops_{__conv{[&] { + return exec::subscribe(static_cast<_Senders&&>(__sndrs), __next_receiver_t{this}); + }}...} { + } + + friend void tag_invoke(stdexec::start_t, __t& __self) noexcept { + __apply([](auto&... __op) { (stdexec::start(__op), ...); }, __self.__ops_); + } + }; + + template + using __item_completion_signatures_of_t = + __concat_item_signatures_t, _Env>; + + template + using __single_item_value_t = __gather_signal< + set_value_t, + __item_completion_signatures_of_t<_Sequence, _Env>, + __msingle_or, + __q<__msingle>>; + + template + concept __sequence_factory = // + sizeof...(_Senders) == 1 && // + __single_typed_sender<__mfront<_Senders...>, _Env> && // + sequence_sender_in<__single_item_value_t<__mfront<_Senders...>, _Env>, _Env>; + + template + struct __dynamic_item_stop { + __operation_base<_Receiver, _ErrorsVariant>* __parent_; + + void operator()() const noexcept { + __parent_->__stop_source_.request_stop(); + __parent_->__notify_completion(); + } + }; + + template + struct __dynamic_item_operation_base { + _ItemReceiver __item_receiver_; + using __stop_token_t = stop_token_of_t>; + using __stop_callback_t = typename __stop_token_t::template callback_type< + __dynamic_item_stop<_Receiver, _ErrorsVariant>>; + std::optional<__stop_callback_t> __on_item_receiver_stopped_{}; + __operation_base<_Receiver, _ErrorsVariant>* __parent_; + }; + + template + struct __dynamic_next_receiver { + class __t; + }; + + template + class __dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>::__t { + public: + using __id = __dynamic_next_receiver; + using is_receiver = void; + + private: + using _Receiver = stdexec::__t<_ReceiverId>; + using _ItemReceiver = stdexec::__t<_ItemReceiverId>; + __dynamic_item_operation_base<_ItemReceiver, _Receiver, _ErrorsVariant>* __op_; + + template _GetEnv, __decays_to<__t> _Self> + friend auto tag_invoke(_GetEnv, _Self&& __self) noexcept + -> make_env_t, with_t> { + return exec::make_env( + stdexec::get_env(__self.__op_->__item_receiver_), + exec::with(get_stop_token, __self.__op_->__parent_->__stop_source_.get_token())); + } + + template _SetNext, same_as<__t> _Self, class _Sender> + requires __callable<_SetNext, _Receiver&, _Sender> + friend auto tag_invoke(_SetNext, _Self& __self, _Sender&& sender) noexcept( + __nothrow_callable<_SetNext, _Receiver&, _Sender>) -> next_sender_of_t<_Receiver, _Sender> { + return exec::set_next(__self.__op_->__parent_->__receiver_, static_cast<_Sender&&>(sender)); + } + + template _SetValue, same_as<__t> _Self> + requires __callable<_SetValue, _ItemReceiver&&> + friend void tag_invoke(_SetValue, _Self&& __self) noexcept { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + stdexec::set_value(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); + __parent->__notify_completion(); + } + + template _SetStopped, same_as<__t> _Self> + requires __callable<_SetStopped, _ItemReceiver&&> + friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); + __parent->__notify_completion(); + } + + template _SetError, same_as<__t> _Self, class _Error> + requires __callable + friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + __parent->__emplace_error(static_cast<_Error&&>(__error)); + __parent->__stop_source_.request_stop(); + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); + __parent->__notify_completion(); + } + }; + + template + struct __subsequence_operation; + + template + struct __subsequence_operation + : __dynamic_item_operation_base< + stdexec::__t<_ItemReceiverId>, + stdexec::__t<_ReceiverId>, + _ErrorsVariant> { + + using _ItemReceiver = stdexec::__t<_ItemReceiverId>; + using _Receiver = stdexec::__t<_ReceiverId>; + using _Subsequence = __single_sender_value_t<_Item>; + + std::optional>> + __op_; + + __subsequence_operation( + _ItemReceiver __item_receiver, + __operation_base<_Receiver, _ErrorsVariant>* __parent) + : __dynamic_item_operation_base<_ItemReceiver, _Receiver, _ErrorsVariant>( + static_cast<_ItemReceiver&&>(__item_receiver), + __parent) { + } + }; + + template + struct __receive_subsequence { + class __t; + }; + + template + class __receive_subsequence<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>::__t { + using _ItemReceiver = stdexec::__t<_ItemReceiverId>; + using _Receiver = stdexec::__t<_ReceiverId>; + using __subsequence_operation_t = + stdexec::__t<__subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>>; + + using __dynamic_next_receiver_t = + stdexec::__t<__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>; + + __subsequence_operation_t* __op_; + + template _GetEnv, __decays_to<__t> _Self> + friend auto tag_invoke(_GetEnv, _Self&& __self) noexcept + -> make_env_t, with_t> { + return exec::make_env( + stdexec::get_env(__self.__op_->__item_receiver_), + exec::with(get_stop_token, __self.__op_->__parent_->__stop_source_.get_token())); + } + + template _SetValue, same_as<__t> _Self, class _Subsequence> + requires sequence_sender_to<_Subsequence, __dynamic_next_receiver_t> + friend void tag_invoke(_SetValue, _Self&& __self, _Subsequence&& subsequence) noexcept { + try { + auto& __next_op = __self.__op_->__op_.emplace(stdexec::__conv{[&] { + return exec::subscribe( + static_cast<_Subsequence&&>(subsequence), __dynamic_next_receiver_t{__self.__op_}); + }}); + stdexec::start(__next_op); + } catch (...) { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + __parent->__emplace_error(std::current_exception()); + __parent->__stop_source_.request_stop(); + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); + __parent->__notify_completion(); + } + } + + template _SetStopped, same_as<__t> _Self> + friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_)); + __parent->__notify_completion(); + } + + template _SetError, same_as<__t> _Self, class _Error> + friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + __parent->__emplace_error(static_cast<_Error&&>(__error)); + __parent->__stop_source_.request_stop(); + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_)); + __parent->__notify_completion(); + } + public: + using __id = __receive_subsequence; + using is_receiver = void; + }; + + template + struct __dynamic_item_operation { + class __t; + }; + + template + class __dynamic_item_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>::__t + : __subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant> { + + using _ItemReceiver = stdexec::__t<_ItemReceiverId>; + using _Receiver = stdexec::__t<_ReceiverId>; + + using _Base = __subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>; + + using __receive_subsequence_t = + stdexec::__t<__receive_subsequence<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>>; + + connect_result_t<_Item, __receive_subsequence_t> __receive_op_; + + public: + explicit __t( + _Item&& __item, + _ItemReceiver __item_receiver, + __operation_base<_Receiver, _ErrorsVariant>* __parent) + : _Base(static_cast<_ItemReceiver&&>(__item_receiver), __parent) + , __receive_op_{ + stdexec::connect(static_cast<_Item&&>(__item), __receive_subsequence_t{this})} { + } + + template _Self> + friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept { + stdexec::start(__self.__receive_op_); + } + }; + + template + struct __dynamic_item_sender { + class __t; + }; + + template + class __dynamic_item_sender<_ItemId, _ReceiverId, _ErrorsVariant>::__t { + using __id = __dynamic_item_sender; + using _Item = stdexec::__t<_ItemId>; + using _Receiver = stdexec::__t<_ReceiverId>; + + template + using __operation_t = stdexec::__t< + __dynamic_item_operation<_Item, stdexec::__id<_ItemReceiver>, _ReceiverId, _ErrorsVariant>>; + + template + using __receive_subsequence_t = stdexec::__t< + __receive_subsequence<_Item, stdexec::__id<_ItemReceiver>, _ReceiverId, _ErrorsVariant>>; + + _Item __item_; + __operation_base<_Receiver, _ErrorsVariant>* __parent_; + + template <__decays_to<__t> _Self, class _ItemReceiver> + requires sender_to<__copy_cvref_t<_Self, _Item>, __receive_subsequence_t<_ItemReceiver>> + friend auto tag_invoke(stdexec::connect_t, _Self&& self, _ItemReceiver __item_receiver) + -> __operation_t<_ItemReceiver> { + return __operation_t<_ItemReceiver>{ + static_cast<_Self&&>(self).__item_, + static_cast<_ItemReceiver&&>(__item_receiver), + self.__parent_}; + } + + public: + using is_sender = void; + using completion_signatures = stdexec::completion_signatures; + + template + explicit __t(_CvItem&& __item, __operation_base<_Receiver, _ErrorsVariant>* __parent) + : __item_(static_cast<_CvItem&&>(__item)) + , __parent_(__parent) { + } + }; + + template + struct __dynamic_receiver { + class __t; + }; + + template + class __dynamic_receiver<_ReceiverId, _ErrorsVariant>::__t { + using _Receiver = stdexec::__t<_ReceiverId>; + + template + using __next_sender_t = stdexec::__t< + __dynamic_item_sender>, _ReceiverId, _ErrorsVariant>>; + + __operation_base<_Receiver, _ErrorsVariant>* __parent_; + + template _GetEnv, __decays_to<__t> _Self> + friend auto tag_invoke(_GetEnv, _Self&& __self) noexcept + -> make_env_t, with_t> { + return exec::make_env( + stdexec::get_env(__self.__parent_->__receiver_), + exec::with(get_stop_token, __self.__parent_->__stop_source_.get_token())); + } + + template _SetNext, same_as<__t> _Self, class _Item> + friend auto tag_invoke(_SetNext, _Self& __self, _Item&& __item) noexcept( + __nothrow_decay_copyable<_Item>) // + -> __next_sender_t<_Item> { + return __next_sender_t<_Item>{static_cast<_Item&&>(__item), __self.__parent_}; + } + + template _SetValue, same_as<__t> _Self> + friend void tag_invoke(_SetValue, _Self&& __self) noexcept { + int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); + if (__error_emplaced == 2) { + std::visit( + __error_visitor<_Receiver>{&__self.__parent_->__receiver_}, + static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_)); + } else { + stdexec::set_value(static_cast<_Receiver&&>(__self.__parent_->__receiver_)); + } + } + + template _SetStopped, same_as<__t> _Self> + friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); + if (__error_emplaced == 2) { + std::visit( + __error_visitor<_Receiver>{&__self.__parent_->__receiver_}, + static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_)); + } else { + exec::__set_value_unless_stopped(static_cast<_Receiver&&>(__self.__parent_->__receiver_)); + } + } + + template _SetError, same_as<__t> _Self, class _Error> + friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + stdexec::set_error( + static_cast<_Receiver&&>(__self.__parent_->__receiver_), static_cast<_Error&&>(__error)); + } + + public: + using __id = __dynamic_receiver; + using is_receiver = void; + + explicit __t(__operation_base<_Receiver, _ErrorsVariant>* __parent) noexcept + : __parent_{__parent} { + } + }; + + template + struct __dynamic_operation { + class __t; + }; + + template + class __dynamic_operation<_Sender, _ReceiverId, _ErrorsVariant>::__t + : __operation_base, _ErrorsVariant> { + using _Receiver = stdexec::__t<_ReceiverId>; + + template _Self> + friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept { + stdexec::start(__self.__op_); + } + + subscribe_result_t<_Sender, __dynamic_receiver<_ReceiverId, _ErrorsVariant>> __op_; + + public: + __t(_Sender&& sndr, _Receiver rcvr) + : __operation_base<_Receiver, _ErrorsVariant>{1, static_cast<_Receiver&&>(rcvr)} + , __op_{exec::subscribe( + static_cast<_Sender&&>(sndr), + __dynamic_receiver<_ReceiverId, _ErrorsVariant>{this})} { + } + }; + + template + struct __sequence { + class __t; + }; + + template + class __sequence<_Senders...>::__t { + template + using __value_type_t = + __single_item_value_t<__copy_cvref_t<_Self, __mfront<_Senders...>>, _Env>; + + template + using __errors_variant_t = + typename __traits<_Receiver, __copy_cvref_t<_Self, _Senders>...>::__errors_variant; + + template + using __dynamic_operation_t = stdexec::__t<__dynamic_operation< + __copy_cvref_t<_Self, __mfront<_Senders...>>, + stdexec::__id<_Receiver>, + __errors_variant_t<_Self, _Receiver>>>; + + template + using __static_operation_t = + stdexec::__t<__operation< stdexec::__id<_Receiver>, __copy_cvref_t<_Self, _Senders>...>>; + + std::tuple<_Senders...> __senders_; + + template <__decays_to<__t> _Self, class _Receiver> + requires(!__sequence_factory, __copy_cvref_t<_Self, _Senders>...>) + friend auto tag_invoke(subscribe_t, _Self&& self, _Receiver receiver) + -> __static_operation_t<_Self, _Receiver> { + return __apply( + [&](_Sndrs&&... sndrs) { + return __static_operation_t<_Self, _Receiver>{ + static_cast<_Receiver&&>(receiver), static_cast<_Sndrs&&>(sndrs)...}; + }, + static_cast<_Self&&>(self).__senders_); + } + + template <__decays_to<__t> _Self, class _Receiver> + requires __sequence_factory, __copy_cvref_t<_Self, _Senders>...> + friend auto tag_invoke(subscribe_t, _Self&& self, _Receiver receiver) // + -> __dynamic_operation_t<_Self, _Receiver> { + return __dynamic_operation_t{ + std::get<0>(static_cast<_Self&&>(self).__senders_), static_cast<_Receiver&&>(receiver)}; + } + + template <__decays_to<__t> _Self, class _Env> + requires(!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>) + friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) + -> __concat_completion_signatures_t< + __to_sequence_completion_signatures<__copy_cvref_t<_Self, _Senders>, _Env>...>; + + template <__decays_to<__t> _Self, class _Env> + requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...> + friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) + -> __to_sequence_completion_signatures<__value_type_t<_Self, _Env>, _Env>; + + template <__decays_to<__t> _Self, class _Env> + requires(!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>) + friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) -> __minvoke< + __mconcat<__q>, + item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>; + + // template <__decays_to<__t> _Self, class _Env> + // requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...> + // friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) + // -> item_types_of_t<__value_type_t<_Self, _Env>, _Env>; + + public: + using __id = __sequence; + using is_sender = exec::sequence_tag; + + __t(_Senders&&... sndrs) + : __senders_{static_cast<_Senders&&>(sndrs)...} { + } + }; + + struct merge_each_t { + template + auto operator()(_Senders&&... senders) const + noexcept((__nothrow_decay_copyable<_Senders> && ...)) + -> __t<__sequence<__decay_t<_Senders>...>> { + return {static_cast<_Senders&&>(senders)...}; + } + + auto operator()() const noexcept -> __binder_back { + return {{}, {}, {}}; + } + }; + } // namespace __merge_each + + using __merge_each::merge_each_t; + + inline constexpr merge_each_t merge_each{}; +} \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4954b9f41..d899a06d6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -86,6 +86,7 @@ set(stdexec_test_sources exec/sequence/test_empty_sequence.cpp exec/sequence/test_ignore_all_values.cpp exec/sequence/test_iterate.cpp + exec/sequence/test_merge_each.cpp exec/sequence/test_transform_each.cpp $<$:tbbexec/test_tbb_thread_pool.cpp> ) diff --git a/test/exec/sequence/test_merge_each.cpp b/test/exec/sequence/test_merge_each.cpp new file mode 100644 index 000000000..205a5abe8 --- /dev/null +++ b/test/exec/sequence/test_merge_each.cpp @@ -0,0 +1,33 @@ +#include "exec/sequence/merge_each.hpp" + +#include "exec/sequence/transform_each.hpp" +#include "exec/sequence/ignore_all_values.hpp" + +#include + +struct then_each_t { + template + auto operator()(Sequence&& sequence, Fn fn) const noexcept { + return exec::transform_each(static_cast(sequence), stdexec::then(fn)); + } + + template + stdexec::__binder_back operator()(Fn fn) const noexcept { + return {{}, {}, {static_cast(fn)}}; + } +}; + +inline constexpr then_each_t then_each; + +TEST_CASE("merge_each - with plain senders", "[sequence_senders][merge_each]") { + int checked = 0; + auto s1 = // + exec::merge_each(stdexec::just(42)) // + | then_each([&](int x) noexcept { + CHECK(x == 42); + ++checked; + }) + | exec::ignore_all_values(); + stdexec::sync_wait(s1); + CHECK(checked == 1); +} \ No newline at end of file From df5c4b470ebb64be2202de3d6c57a9fb8c7f19b7 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 14 Nov 2023 00:02:41 +0100 Subject: [PATCH 2/5] Fix gcc --- include/exec/sequence/merge_each.hpp | 10 +++++----- include/stdexec/__detail/__basic_sender.hpp | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/exec/sequence/merge_each.hpp b/include/exec/sequence/merge_each.hpp index 59347e88d..e7070478a 100644 --- a/include/exec/sequence/merge_each.hpp +++ b/include/exec/sequence/merge_each.hpp @@ -578,7 +578,7 @@ namespace exec { requires __sequence_factory, __copy_cvref_t<_Self, _Senders>...> friend auto tag_invoke(subscribe_t, _Self&& self, _Receiver receiver) // -> __dynamic_operation_t<_Self, _Receiver> { - return __dynamic_operation_t{ + return __dynamic_operation_t<_Self, _Receiver>{ std::get<0>(static_cast<_Self&&>(self).__senders_), static_cast<_Receiver&&>(receiver)}; } @@ -599,10 +599,10 @@ namespace exec { __mconcat<__q>, item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>; - // template <__decays_to<__t> _Self, class _Env> - // requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...> - // friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) - // -> item_types_of_t<__value_type_t<_Self, _Env>, _Env>; + template <__decays_to<__t> _Self, class _Env> + requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...> + friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) + -> item_types_of_t<__value_type_t<_Self, _Env>, _Env>; public: using __id = __sequence; diff --git a/include/stdexec/__detail/__basic_sender.hpp b/include/stdexec/__detail/__basic_sender.hpp index b72e3e8bd..e2976e7ef 100644 --- a/include/stdexec/__detail/__basic_sender.hpp +++ b/include/stdexec/__detail/__basic_sender.hpp @@ -55,7 +55,7 @@ namespace stdexec { }; STDEXEC_PRAGMA_PUSH() - STDEXEC_PRAGMA_IGNORE_GNU("-Wunused-local-typedef") + STDEXEC_PRAGMA_IGNORE_GNU("-Wunused-local-typedefs") struct __get_meta { template From f57e5a2b0297623e23e02391148665d893bfef43 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 14 Nov 2023 13:43:47 +0100 Subject: [PATCH 3/5] Add a test for the dynamic version --- include/exec/sequence/merge_each.hpp | 59 +++++++++++++++++--------- test/exec/sequence/test_merge_each.cpp | 46 +++++++++++++++++--- 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/include/exec/sequence/merge_each.hpp b/include/exec/sequence/merge_each.hpp index e7070478a..042636933 100644 --- a/include/exec/sequence/merge_each.hpp +++ b/include/exec/sequence/merge_each.hpp @@ -206,11 +206,11 @@ namespace exec { template struct __dynamic_item_operation_base { _ItemReceiver __item_receiver_; + __operation_base<_Receiver, _ErrorsVariant>* __parent_; using __stop_token_t = stop_token_of_t>; using __stop_callback_t = typename __stop_token_t::template callback_type< __dynamic_item_stop<_Receiver, _ErrorsVariant>>; std::optional<__stop_callback_t> __on_item_receiver_stopped_{}; - __operation_base<_Receiver, _ErrorsVariant>* __parent_; }; template @@ -220,10 +220,6 @@ namespace exec { template class __dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>::__t { - public: - using __id = __dynamic_next_receiver; - using is_receiver = void; - private: using _Receiver = stdexec::__t<_ReceiverId>; using _ItemReceiver = stdexec::__t<_ItemReceiverId>; @@ -240,7 +236,7 @@ namespace exec { template _SetNext, same_as<__t> _Self, class _Sender> requires __callable<_SetNext, _Receiver&, _Sender> friend auto tag_invoke(_SetNext, _Self& __self, _Sender&& sender) noexcept( - __nothrow_callable<_SetNext, _Receiver&, _Sender>) -> next_sender_of_t<_Receiver, _Sender> { + __nothrow_callable<_SetNext, _Receiver&, _Sender>) { // -> next_sender_of_t<_Receiver, _Sender> { return exec::set_next(__self.__op_->__parent_->__receiver_, static_cast<_Sender&&>(sender)); } @@ -269,6 +265,14 @@ namespace exec { stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); } + public: + using __id = __dynamic_next_receiver; + using is_receiver = void; + + explicit __t( + __dynamic_item_operation_base<_ItemReceiver, _Receiver, _ErrorsVariant>* __op) noexcept + : __op_{__op} { + } }; template @@ -287,7 +291,7 @@ namespace exec { std::optional>> + stdexec::__t<__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>>> __op_; __subsequence_operation( @@ -309,7 +313,7 @@ namespace exec { using _ItemReceiver = stdexec::__t<_ItemReceiverId>; using _Receiver = stdexec::__t<_ReceiverId>; using __subsequence_operation_t = - stdexec::__t<__subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>>; + __subsequence_operation<_Item, _ItemReceiverId, _ReceiverId, _ErrorsVariant>; using __dynamic_next_receiver_t = stdexec::__t<__dynamic_next_receiver<_ItemReceiverId, _ReceiverId, _ErrorsVariant>>; @@ -345,7 +349,7 @@ namespace exec { template _SetStopped, same_as<__t> _Self> friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; - stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_)); + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); } @@ -354,12 +358,16 @@ namespace exec { __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; __parent->__emplace_error(static_cast<_Error&&>(__error)); __parent->__stop_source_.request_stop(); - stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__receiver_)); + stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); } public: using __id = __receive_subsequence; using is_receiver = void; + + explicit __t(__subsequence_operation_t* __op) noexcept + : __op_{__op} { + } }; template @@ -475,7 +483,7 @@ namespace exec { int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); if (__error_emplaced == 2) { std::visit( - __error_visitor<_Receiver>{&__self.__parent_->__receiver_}, + __error_visitor<_Receiver>{__self.__parent_->__receiver_}, static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_)); } else { stdexec::set_value(static_cast<_Receiver&&>(__self.__parent_->__receiver_)); @@ -487,7 +495,7 @@ namespace exec { int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); if (__error_emplaced == 2) { std::visit( - __error_visitor<_Receiver>{&__self.__parent_->__receiver_}, + __error_visitor<_Receiver>{__self.__parent_->__receiver_}, static_cast<_ErrorsVariant&&>(__self.__parent_->__errors_)); } else { exec::__set_value_unless_stopped(static_cast<_Receiver&&>(__self.__parent_->__receiver_)); @@ -524,14 +532,15 @@ namespace exec { stdexec::start(__self.__op_); } - subscribe_result_t<_Sender, __dynamic_receiver<_ReceiverId, _ErrorsVariant>> __op_; + subscribe_result_t<_Sender, stdexec::__t<__dynamic_receiver<_ReceiverId, _ErrorsVariant>>> + __op_; public: __t(_Sender&& sndr, _Receiver rcvr) : __operation_base<_Receiver, _ErrorsVariant>{1, static_cast<_Receiver&&>(rcvr)} , __op_{exec::subscribe( static_cast<_Sender&&>(sndr), - __dynamic_receiver<_ReceiverId, _ErrorsVariant>{this})} { + stdexec::__t<__dynamic_receiver<_ReceiverId, _ErrorsVariant>>{this})} { } }; @@ -593,16 +602,24 @@ namespace exec { friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) -> __to_sequence_completion_signatures<__value_type_t<_Self, _Env>, _Env>; - template <__decays_to<__t> _Self, class _Env> - requires(!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>) - friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) -> __minvoke< - __mconcat<__q>, - item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>; + template + static auto get_item_types() noexcept { + if constexpr (!__sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...>) { + using _Result = __minvoke< + __mconcat<__q>, + item_types_of_t<__copy_cvref_t<_Self, _Senders>, _Env>...>; + return (_Result(*)()) nullptr; + } else { + using _Result = item_types_of_t<__value_type_t<_Self, _Env>, _Env>; + return (_Result(*)()) nullptr; + } + } template <__decays_to<__t> _Self, class _Env> - requires __sequence_factory<_Env, __copy_cvref_t<_Self, _Senders>...> friend auto tag_invoke(get_item_types_t, _Self&&, _Env&&) - -> item_types_of_t<__value_type_t<_Self, _Env>, _Env>; + -> decltype(get_item_types<_Self, _Env>()()) { + return {}; + } public: using __id = __sequence; diff --git a/test/exec/sequence/test_merge_each.cpp b/test/exec/sequence/test_merge_each.cpp index 205a5abe8..0a3108fea 100644 --- a/test/exec/sequence/test_merge_each.cpp +++ b/test/exec/sequence/test_merge_each.cpp @@ -2,6 +2,7 @@ #include "exec/sequence/transform_each.hpp" #include "exec/sequence/ignore_all_values.hpp" +#include "exec/sequence/iterate.hpp" #include @@ -21,13 +22,44 @@ inline constexpr then_each_t then_each; TEST_CASE("merge_each - with plain senders", "[sequence_senders][merge_each]") { int checked = 0; - auto s1 = // - exec::merge_each(stdexec::just(42)) // - | then_each([&](int x) noexcept { - CHECK(x == 42); + SECTION("one just") { + auto s1 = // + exec::merge_each(stdexec::just(42)) // + | then_each([&](int x) noexcept { + CHECK(x == 42); + ++checked; + }) + | exec::ignore_all_values(); + stdexec::sync_wait(s1); + CHECK(checked == 1); + } + SECTION("two senders") { + auto s1 = // + exec::merge_each( + stdexec::just(42), // + stdexec::just(43)) // + | then_each([&](int x) noexcept { + CHECK(x == 42 + checked); + ++checked; + }) + | exec::ignore_all_values(); + stdexec::sync_wait(s1); + CHECK(checked == 2); + } +} + +TEST_CASE("merge_each - with iterate", "[sequence_senders][merge_each]") { + std::array arr = {1, 2, 3}; + auto view = std::views::all(arr); + int checked = 0; + auto s1 = // + exec::iterate(view) // + | then_each([=](int x) noexcept { return exec::iterate(std::views::iota(0, x)); }) // + | exec::merge_each() // + | then_each([&](int) noexcept { ++checked; - }) - | exec::ignore_all_values(); + }) // + | exec::ignore_all_values(); // stdexec::sync_wait(s1); - CHECK(checked == 1); + CHECK(checked == 6); } \ No newline at end of file From b3e7675aaf7be87be00c0f11a6c71aaaa59f6605 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Thu, 16 Nov 2023 18:56:18 +0100 Subject: [PATCH 4/5] Properly construct and reset stop callback --- include/exec/sequence/merge_each.hpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/include/exec/sequence/merge_each.hpp b/include/exec/sequence/merge_each.hpp index 042636933..4d8657196 100644 --- a/include/exec/sequence/merge_each.hpp +++ b/include/exec/sequence/merge_each.hpp @@ -243,6 +243,7 @@ namespace exec { template _SetValue, same_as<__t> _Self> requires __callable<_SetValue, _ItemReceiver&&> friend void tag_invoke(_SetValue, _Self&& __self) noexcept { + __self.__op_->__on_item_receiver_stopped_.reset(); __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; stdexec::set_value(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); @@ -251,6 +252,7 @@ namespace exec { template _SetStopped, same_as<__t> _Self> requires __callable<_SetStopped, _ItemReceiver&&> friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __self.__op_->__on_item_receiver_stopped_.reset(); __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); @@ -259,6 +261,7 @@ namespace exec { template _SetError, same_as<__t> _Self, class _Error> requires __callable friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __self.__op_->__on_item_receiver_stopped_.reset(); __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; __parent->__emplace_error(static_cast<_Error&&>(__error)); __parent->__stop_source_.request_stop(); @@ -339,6 +342,7 @@ namespace exec { stdexec::start(__next_op); } catch (...) { __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; + __self.__op_->__on_item_receiver_stopped_.reset(); __parent->__emplace_error(std::current_exception()); __parent->__stop_source_.request_stop(); stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); @@ -348,6 +352,7 @@ namespace exec { template _SetStopped, same_as<__t> _Self> friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __self.__op_->__on_item_receiver_stopped_.reset(); __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; stdexec::set_stopped(static_cast<_ItemReceiver&&>(__self.__op_->__item_receiver_)); __parent->__notify_completion(); @@ -355,6 +360,7 @@ namespace exec { template _SetError, same_as<__t> _Self, class _Error> friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __self.__op_->__on_item_receiver_stopped_.reset(); __operation_base<_Receiver, _ErrorsVariant>* __parent = __self.__op_->__parent_; __parent->__emplace_error(static_cast<_Error&&>(__error)); __parent->__stop_source_.request_stop(); @@ -401,6 +407,9 @@ namespace exec { template _Self> friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept { + __self.__on_item_receiver_stopped_.emplace( + stdexec::get_stop_token(stdexec::get_env(__self.__item_receiver_)), + __dynamic_item_stop<_Receiver, _ErrorsVariant>{__self.__parent_}); stdexec::start(__self.__receive_op_); } }; @@ -480,6 +489,7 @@ namespace exec { template _SetValue, same_as<__t> _Self> friend void tag_invoke(_SetValue, _Self&& __self) noexcept { + __self.__parent_->__on_receiver_stopped_.reset(); int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); if (__error_emplaced == 2) { std::visit( @@ -492,6 +502,7 @@ namespace exec { template _SetStopped, same_as<__t> _Self> friend void tag_invoke(_SetStopped, _Self&& __self) noexcept { + __self.__parent_->__on_receiver_stopped_.reset(); int __error_emplaced = __self.__parent_->__error_emplaced_.load(std::memory_order_acquire); if (__error_emplaced == 2) { std::visit( @@ -504,6 +515,7 @@ namespace exec { template _SetError, same_as<__t> _Self, class _Error> friend void tag_invoke(_SetError, _Self&& __self, _Error&& __error) noexcept { + __self.__parent_->__on_receiver_stopped_.reset(); stdexec::set_error( static_cast<_Receiver&&>(__self.__parent_->__receiver_), static_cast<_Error&&>(__error)); } @@ -529,6 +541,9 @@ namespace exec { template _Self> friend void tag_invoke(stdexec::start_t, _Self& __self) noexcept { + __self.__on_receiver_stopped_.emplace( + stdexec::get_stop_token(stdexec::get_env(__self.__receiver_)), + __default_stop_callback{__self.__stop_source_}); stdexec::start(__self.__op_); } From 203551a5bb316c30550be4ffbd9ce449624ae2d2 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Thu, 14 Dec 2023 09:38:31 +0100 Subject: [PATCH 5/5] Guard test relying on std ranges --- test/exec/sequence/test_merge_each.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/exec/sequence/test_merge_each.cpp b/test/exec/sequence/test_merge_each.cpp index 0a3108fea..62043e341 100644 --- a/test/exec/sequence/test_merge_each.cpp +++ b/test/exec/sequence/test_merge_each.cpp @@ -48,6 +48,7 @@ TEST_CASE("merge_each - with plain senders", "[sequence_senders][merge_each]") { } } +#if STDEXEC_HAS_STD_RANGES() TEST_CASE("merge_each - with iterate", "[sequence_senders][merge_each]") { std::array arr = {1, 2, 3}; auto view = std::views::all(arr); @@ -62,4 +63,5 @@ TEST_CASE("merge_each - with iterate", "[sequence_senders][merge_each]") { | exec::ignore_all_values(); // stdexec::sync_wait(s1); CHECK(checked == 6); -} \ No newline at end of file +} +#endif \ No newline at end of file