diff --git a/include/exec/__detail/__decl_receiver.hpp b/include/exec/__detail/__decl_receiver.hpp new file mode 100644 index 000000000..3c33e46da --- /dev/null +++ b/include/exec/__detail/__decl_receiver.hpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/__detail/__execution_fwd.hpp" +#include "../../stdexec/__detail/__receivers.hpp" + +#include "../../stdexec/concepts.hpp" + +namespace exec { + +// disable spurious warning in clang +// https://github.com/llvm/llvm-project/issues/61566 +STDEXEC_PRAGMA_PUSH() +STDEXEC_PRAGMA_IGNORE_GNU("-Wundefined-internal") + +// fake receiver used to calculate whether inner connect is nothrow +template +struct __decl_receiver { + using __t = __decl_receiver; + using __id = __decl_receiver; + + using receiver_concept = stdexec::receiver_t; + + template + void set_value_t(_An&&... __an) && noexcept; + + template + void set_error_t(_Error&& __err) && noexcept; + + void set_stopped_t() && noexcept; + + _Env get_env_t() const& noexcept; +}; + +STDEXEC_PRAGMA_POP() + +} // namespace exec diff --git a/include/exec/__detail/__tuple_index_pack.hpp b/include/exec/__detail/__tuple_index_pack.hpp new file mode 100644 index 000000000..5e3791499 --- /dev/null +++ b/include/exec/__detail/__tuple_index_pack.hpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/concepts.hpp" +#include "../../stdexec/__detail/__tuple.hpp" + +#include +#include +#include + +namespace exec { + +struct __tuple_index_pack_t { + template + auto operator()(Fn&& fn, T&& t, Tn&&... tn) const { + return fn(std::make_index_sequence>::value>(), std::forward(t), std::forward(tn)...); + } +}; +constexpr inline static __tuple_index_pack_t __tuple_index_pack; + +} // namespace exec diff --git a/include/exec/__detail/__tuple_reverse.hpp b/include/exec/__detail/__tuple_reverse.hpp new file mode 100644 index 000000000..cfe2d0084 --- /dev/null +++ b/include/exec/__detail/__tuple_reverse.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/concepts.hpp" +#include "../../stdexec/__detail/__tuple.hpp" + +#include +#include +#include + +namespace exec { + +template class R, class I, class... Tn> +struct __apply_reverse_impl; + +template class R, std::size_t... In, class... Tn> +struct __apply_reverse_impl, Tn...> { + using tn_t = std::tuple; + using type = R::type...>; +}; + +template class R, class... Tn> +using __apply_reverse = typename __apply_reverse_impl, Tn...>::type; + +struct __tuple_reverse_t { + template + static auto reverse(T&& t, std::index_sequence) { + return std::make_tuple(std::get(std::forward(t))...); + } + + template + auto operator()(T&& t) const { + return __tuple_reverse_t::reverse(std::forward(t), std::make_index_sequence::value>()); + } +}; +constexpr inline static __tuple_reverse_t __tuple_reverse; + +} // namespace exec diff --git a/include/exec/async_object.hpp b/include/exec/async_object.hpp new file mode 100644 index 000000000..9f230b77a --- /dev/null +++ b/include/exec/async_object.hpp @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" +#include "../stdexec/concepts.hpp" + +#include "__detail/__manual_lifetime.hpp" + +namespace exec { + +struct async_construct_t { + template + auto operator()(_O&& __o, _Stg& __stg, _An&&... __an) const + noexcept(noexcept(((_O&&)__o).async_construct(__stg, ((_An&&)__an)...))) + -> decltype(((_O&&)__o).async_construct(__stg, ((_An&&)__an)...)) { + using __construct = decltype(((_O&&)__o).async_construct(__stg, ((_An&&)__an)...)); + static_assert(!stdexec::same_as<__construct, void>, "async_construct must not return void"); + static_assert(stdexec::__single_value_sender<__construct>, "async_construct must return a sender with a single set_value overload"); + static_assert(stdexec::sender_of<__construct, stdexec::set_value_t(typename std::remove_cvref_t<_O>::handle)>, "async_construct must return a sender that completes with set_value(handle)"); + return ((_O&&)__o).async_construct(__stg, ((_An&&)__an)...); + } + template + auto operator()(_O&& __o, _Stg& __stg, _An&&... __an) const + noexcept(noexcept(((_O&&)__o).async_construct(__stg, ((_An&&)__an)...))) + -> std::enable_if_t< + stdexec::same_as, + decltype(((_O&&)__o).async_construct(__stg, ((_An&&)__an)...))> = delete; +}; +constexpr inline static async_construct_t async_construct{}; + +template +using async_construct_result_t = stdexec::__call_result_t&, typename std::remove_cvref_t<_O>::storage&, _An...>; + +struct async_destruct_t { + template + auto operator()(_O&& __o, _Stg& __stg) const + noexcept + -> std::enable_if_t< + !stdexec::same_as, + decltype(((_O&&)__o).async_destruct(__stg))> { + static_assert(noexcept(((_O&&)__o).async_destruct(__stg)), "async_destruct must be noexcept"); + using __destruct = decltype(((_O&&)__o).async_destruct(__stg)); + static_assert(!stdexec::same_as<__destruct, void>, "async_destruct must not return void"); + static_assert(stdexec::__single_value_sender<__destruct>, "async_destruct must return a sender with a single set_value overload"); + static_assert(stdexec::sender_of<__destruct, stdexec::set_value_t()>, "async_destruct must return a sender that completes with set_value()"); + static_assert(stdexec::__nofail_sender<__destruct>, "async_destruct must return a sender that has no set_error(..) completions"); + return ((_O&&)__o).async_destruct(__stg); + } + template + auto operator()(_O&& __o, _Stg& __stg) const + noexcept + -> std::enable_if_t< + stdexec::same_as, + decltype(((_O&&)__o).async_destruct(__stg))> = delete; +}; +constexpr inline static async_destruct_t async_destruct{}; + +template +using async_destruct_result_t = stdexec::__call_result_t&, typename std::remove_cvref_t<_O>::storage&>; + +namespace __async_object { + +template +concept __immovable_object = + !std::is_move_constructible_v<_T> && + !std::is_copy_constructible_v<_T> && + !std::is_move_assignable_v<_T> && + !std::is_copy_assignable_v<_T>; + +template +concept __object = + !std::is_default_constructible_v<_T> && + __immovable_object<_T>; + +template +concept __storage = + std::is_nothrow_default_constructible_v<_T> && + __immovable_object<_T>; + +template +concept __async_destruct_result_valid = + stdexec::__single_value_sender<_S> && + stdexec::sender_of<_S, stdexec::set_value_t()>; + +} // namespace __async_object + +template +concept async_object = + requires (){ + typename _T::object; + typename _T::handle; + typename _T::storage; + } && + std::is_move_constructible_v<_T> && + std::is_nothrow_move_constructible_v && + __async_object::__object && + __async_object::__storage && + requires (const _T& __t_clv, typename _T::storage& __s_lv){ + { async_destruct_t{}(__t_clv, __s_lv) } + -> stdexec::__nofail_sender; + } && + __async_object::__async_destruct_result_valid>; + +template +concept async_object_constructible_from = + async_object<_T> && + requires (const _T& __t_clv, typename _T::storage& __s_lv, _An... __an){ + { async_construct_t{}(__t_clv, __s_lv, __an...) } + -> stdexec::sender_of; + }; + +} // namespace exec diff --git a/include/exec/async_tuple.hpp b/include/exec/async_tuple.hpp new file mode 100644 index 000000000..dcc64bf87 --- /dev/null +++ b/include/exec/async_tuple.hpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "stdexec/__detail/__execution_fwd.hpp" + +#include "stdexec/concepts.hpp" +#include "stdexec/functional.hpp" + +#include "__detail/__tuple_reverse.hpp" + +#include "async_object.hpp" + +#include "variant_sender.hpp" + +namespace exec { + +// +// implementation of async_tuple +// + +template +struct __async_tuple { + + struct __t { + using __id = __async_tuple; + + using __fyn_t = stdexec::__decayed_std_tuple...>; + using __stgn_t = stdexec::__decayed_std_tuple::storage...>; + + STDEXEC_ATTRIBUTE((no_unique_address)) __fyn_t __fyn_; + + + explicit __t(__fyn_t __fyn) : __fyn_(std::move(__fyn)) {} + explicit __t(stdexec::__t<_FynId>... __fyn) : __fyn_(std::move(__fyn)...) {} + + using object = std::tuple::handle...>; + using handle = std::tuple::handle...>; + struct storage : stdexec::__immovable { + STDEXEC_ATTRIBUTE((no_unique_address)) std::optional<__fyn_t> __fyn_; + STDEXEC_ATTRIBUTE((no_unique_address)) __stgn_t __stgn_; + std::optional o; + }; + + auto async_construct(storage& stg) noexcept { + stg.__fyn_.emplace(__fyn_); + auto mc = stdexec::__apply( + [&](typename stdexec::__t<_FynId>&... __fyn) noexcept { + return stdexec::__apply( + [&](typename stdexec::__t<_FynId>::storage&... __stgn) noexcept { + return stdexec::when_all( + stdexec::just(std::ref(stg)), + exec::async_construct(__fyn, __stgn)... + ); + }, stg.__stgn_); + }, stg.__fyn_.value()); + auto oc = stdexec::then(mc, [](storage& stg, typename stdexec::__t<_FynId>::handle... hn) noexcept { + auto construct = [&]() noexcept { return object{hn...}; }; + stg.o.emplace(stdexec::__conv{construct}); + return handle{stg.o.value()}; + }); + return oc; + } + + using __ref_storage = stdexec::__call_result_t>; + template + using __destruction_n = stdexec::__call_result_t; + template + using __destruct_all = stdexec::__call_result_t; + using __destruction = exec::__apply_reverse<__destruct_all, __destruction_n>...>; + + auto async_destruct(storage& stg) noexcept { + auto make_destruct = [&] () noexcept -> exec::variant_sender<__destruction, __ref_storage> { + if (stg.__fyn_.has_value()) { + return stdexec::__apply( + [&](typename stdexec::__t<_FynId>&&... __fyn) noexcept { + return stdexec::__apply( + [&](typename stdexec::__t<_FynId>::storage&... __stgn) noexcept { + return stdexec::__apply( + [&](auto&&... __d) noexcept { + return stdexec::when_all( + stdexec::just(std::ref(stg)), + __d...); + }, exec::__tuple_reverse(std::make_tuple(exec::async_destruct(__fyn, __stgn)...))); + }, stg.__stgn_); + }, std::move(stg.__fyn_.value())); + } else { + return stdexec::just(std::ref(stg)); + } + }; + auto od = stdexec::then(make_destruct(), [](storage& stg) noexcept { + stg.o.reset(); + stg.__fyn_.reset(); + }); + return od; + } + + }; +}; + +template +using __async_tuple_t = stdexec::__t<__async_tuple>...>>; + +// make_async_tuple is an algorithm that creates an async-object that +// contains a tuple of the given async-objects. +// the async_tuple object will compose the async-constructors and +// async-destructors of all the given async-objects +struct make_async_tuple_t { + template + __async_tuple_t<_Fyn...> operator()(_Fyn&&... __fyn) const { + using __fyn_t = typename __async_tuple_t<_Fyn...>::__fyn_t; + return __async_tuple_t<_Fyn...>{__fyn_t{(_Fyn&&)__fyn...}}; + } +}; +constexpr inline static make_async_tuple_t make_async_tuple{}; + +} // namespace exec diff --git a/include/exec/async_using.hpp b/include/exec/async_using.hpp new file mode 100644 index 000000000..6691bf538 --- /dev/null +++ b/include/exec/async_using.hpp @@ -0,0 +1,476 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "stdexec/__detail/__execution_fwd.hpp" +#include "stdexec/__detail/__transform_completion_signatures.hpp" + +#include "stdexec/concepts.hpp" +#include "stdexec/functional.hpp" + +#include "__detail/__decl_receiver.hpp" +#include "__detail/__tuple_reverse.hpp" +#include "__detail/__tuple_index_pack.hpp" + +#include "async_object.hpp" + +namespace exec { + +// +// implementation of async_using +// + +namespace __async_using { + +template +using __variant_for_t = // + stdexec::__for_each_completion_signature< + _Sigs, + stdexec::__decayed_std_tuple, + stdexec::__nullable_std_variant>; + +template +using __omit_set_value_t = stdexec::completion_signatures<>; + +template +using __non_value_completion_signatures_t = // + stdexec::transform_completion_signatures_of< + _Sender, + _Env, + stdexec::completion_signatures<>, + __omit_set_value_t>; + +template +struct __destructed { + using _Result = stdexec::__t<_ResultId>; + using _ErrorCompletionFilter = stdexec::__t<_ErrorCompletionFilterId>; + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __destructed; + + using receiver_concept = stdexec::receiver_t; + + _Result* __result_; + _Receiver* __rcvr_; + + void set_value() && noexcept { + STDEXEC_ASSERT(!__result_->valueless_by_exception()); + std::visit( + [__rcvr = this->__rcvr_](_Tup& __tupl) noexcept -> void { + if constexpr (stdexec::same_as<_Tup, std::monostate>) { + std::terminate(); // reaching this indicates a bug + } else { + stdexec::__apply( + [&](auto __tag, _Args&... __args) noexcept -> void { + // use calculation of the completion_signatures in __sender + // to filter out set_error_t(std::exception_ptr) when it cannot occur + if constexpr (stdexec::same_as< + stdexec::completion_signatures, + _ErrorCompletionFilter>) { + std::terminate(); + } else { + __tag(std::move(*__rcvr), (_Args&&) __args...); + } + }, + __tupl); + } + }, + *__result_); + } + + template + void set_error(_Error&&) && noexcept = delete; + + void set_stopped() && noexcept { + STDEXEC_ASSERT(!__result_->valueless_by_exception()); + std::visit( + [__rcvr = this->__rcvr_](_Tup& __tupl) noexcept -> void { + if constexpr (stdexec::same_as<_Tup, std::monostate>) { + std::terminate(); // reaching this indicates a bug + } else { + stdexec::__apply( + [&](auto __tag, _Args&... __args) noexcept -> void { + // use calculation of the completion_signatures in __sender + // to filter out set_error_t(std::exception_ptr) when it cannot occur + if constexpr (stdexec::same_as< + stdexec::completion_signatures, + _ErrorCompletionFilter>) { + std::terminate(); + } else { + __tag(std::move(*__rcvr), (_Args&&) __args...); + } + }, + __tupl); + } + }, + *__result_); + } + + stdexec::env_of_t<_Receiver> get_env() && noexcept { + return stdexec::get_env(*__rcvr_); + } + }; +}; + +template +struct __outside { + using _Result = stdexec::__t<_ResultId>; + using _DestructState = stdexec::__t<_DestructStateId>; + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __outside; + + using receiver_concept = stdexec::receiver_t; + + _Result* __result_; + _DestructState* __destruct_state_; + _Receiver* __rcvr_; + + template + void set_value(_An&&... __an) && noexcept { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_value, (_An&&)__an...); + stdexec::start(*__destruct_state_); + } + + template + void set_error(_Error&& __err) && noexcept { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_error, (_Error&&) __err); + stdexec::start(*__destruct_state_); + } + + void set_stopped() && noexcept { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_stopped); + stdexec::start(*__destruct_state_); + } + + stdexec::env_of_t<_Receiver> get_env() const& noexcept { + return stdexec::get_env(*__rcvr_); + } + }; +}; + +template +struct __constructed { + using _Result = stdexec::__t<_ResultId>; + using _InnerFn = stdexec::__t<_InnerFnId>; + using _InsideState = stdexec::__t<_InsideStateId>; + using _DestructState = stdexec::__t<_DestructStateId>; + using _ErrorCompletionFilter = stdexec::__t<_ErrorCompletionFilterId>; + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __constructed; + + using receiver_concept = stdexec::receiver_t; + + using __fyn_t = stdexec::__decayed_std_tuple...>; + using __stgn_t = stdexec::__decayed_std_tuple::storage...>; + + using __inside = stdexec::__call_result_t<_InnerFn, typename stdexec::__t<_FynId>::handle...>; + + using __destructed_t = stdexec::__t<__destructed<_ResultId, _ErrorCompletionFilterId, _ReceiverId>>; + using __outside_t = stdexec::__t<__outside<_ResultId, _DestructStateId, _ReceiverId>>; + + __fyn_t* __fyn_; + __stgn_t* __stgn_; + _Result* __result_; + _InnerFn* __inner_; + std::optional<_InsideState>* __inside_state_; + std::optional<_DestructState>* __destruct_state_; + _Receiver* __rcvr_; + + template + using __destruction_n = stdexec::__call_result_t; + template + using __destruct_all = stdexec::__call_result_t; + using __destruction = exec::__apply_reverse<__destruct_all, __destruction_n>...>; + using __destruct_state = stdexec::connect_result_t<__destruction, __destructed_t>; + + void __make_destruct() noexcept { + auto __destruct = [this](){ + return stdexec::connect( + exec::__tuple_index_pack( + [](std::index_sequence, auto&& __fyn, auto&& __stgn) noexcept { + return stdexec::when_all(exec::async_destruct( + std::get(__fyn), + std::get(__stgn))...); + }, *__fyn_, *__stgn_), + __destructed_t{__result_, __rcvr_}); + }; + __destruct_state_->emplace(stdexec::__conv{__destruct}); + } + + void set_value(typename stdexec::__t<_FynId>::handle... __o) && noexcept { + // launch nested function + auto inside = [&, this] { + __make_destruct(); + auto inner = (*__inner_)(typename stdexec::__t<_FynId>::handle{__o}...); + return stdexec::connect(std::move(inner), __outside_t{__result_, &__destruct_state_->value(), __rcvr_}); + }; + if constexpr ( + stdexec::__nothrow_callable<_InnerFn, typename stdexec::__t<_FynId>::handle...> && + stdexec::__nothrow_callable) { + __inside_state_->emplace(stdexec::__conv{inside}); + } else { + try { + __inside_state_->emplace(stdexec::__conv{inside}); + } catch (...) { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_error, std::current_exception()); + __make_destruct(); + stdexec::start(__destruct_state_->value()); + return; + } + } + stdexec::start(__inside_state_->value()); + } + + template + void set_error(_Error&& __err) && noexcept { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_error, (_Error&&) __err); + __make_destruct(); + stdexec::start(__destruct_state_->value()); + } + + void set_stopped() && noexcept { + using __async_result = stdexec::__decayed_std_tuple; + __result_->template emplace<__async_result>(stdexec::set_stopped); + __make_destruct(); + stdexec::start(__destruct_state_->value()); + } + + stdexec::env_of_t<_Receiver> get_env() const& noexcept { + return stdexec::get_env(*__rcvr_); + } + }; +}; + +// async-using operation state. +// constructs all the async-objects into reserved storage +// destructs all the async-objects in the reserved storage +template +struct __operation { + using _InnerFn = stdexec::__t<_InnerFnId>; + using _Receiver = stdexec::__t<_ReceiverId>; + using _ErrorCompletionFilter = stdexec::__t<_ErrorCompletionFilterId>; + using fyn_t = stdexec::__decayed_std_tuple...>; + using stgn_t = stdexec::__decayed_std_tuple::storage...>; + + struct __t { + using __id = __operation; + + template + using __construction_n = stdexec::__call_result_t; + using __construction = stdexec::__call_result_t>...>; + + using __inside = stdexec::__call_result_t<_InnerFn, typename stdexec::__t<_FynId>::handle...>; + using __result_t = __async_using::__variant_for_t< + stdexec::__concat_completion_signatures< + __async_using::__non_value_completion_signatures_t<__construction, stdexec::env_of_t<_Receiver>>, + stdexec::completion_signatures_of_t<__inside, stdexec::env_of_t<_Receiver>>, + // always reserve storage for exception_ptr so that the actual + // completion-signatures can be calculated + stdexec::completion_signatures>>; + + using __destructed_t = stdexec::__t<__destructed, _ErrorCompletionFilterId, _ReceiverId>>; + template + using __destruction_n = stdexec::__call_result_t; + template + using __destruct_all = stdexec::__call_result_t; + using __destruction = exec::__apply_reverse<__destruct_all, __destruction_n>...>; + using __destruct_state = stdexec::connect_result_t<__destruction, __destructed_t>; + + using __outside_t = stdexec::__t<__outside, stdexec::__id<__destruct_state>, _ReceiverId>>; + using __inside_state = stdexec::connect_result_t<__inside, __outside_t>; + + using __constructed_t = stdexec::__t<__constructed< + stdexec::__id<__result_t>, + _InnerFnId, + stdexec::__id<__inside_state>, + stdexec::__id<__destruct_state>, + _ErrorCompletionFilterId, + _ReceiverId, + _FynId...>>; + using __construct_state = stdexec::connect_result_t<__construction, __constructed_t>; + + STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_; + STDEXEC_ATTRIBUTE((no_unique_address)) _InnerFn __inner_; + STDEXEC_ATTRIBUTE((no_unique_address)) fyn_t __fyn_; + + STDEXEC_ATTRIBUTE((no_unique_address)) stgn_t __stgn_; + STDEXEC_ATTRIBUTE((no_unique_address)) __result_t __result_; + STDEXEC_ATTRIBUTE((no_unique_address)) std::optional<__construct_state> __construct_state_; + STDEXEC_ATTRIBUTE((no_unique_address)) std::optional<__destruct_state> __destruct_state_; + STDEXEC_ATTRIBUTE((no_unique_address)) std::optional<__inside_state> __inside_state_; + + __t(_Receiver __r_, _InnerFn __i_, fyn_t __fy_) : + __rcvr_(std::move(__r_)), __inner_(std::move(__i_)), + __fyn_(std::move(__fy_)) { + auto __construct = [this](){ + return stdexec::connect( + exec::__tuple_index_pack( + [&](std::index_sequence, auto&& __fyn, auto&& __stgn) noexcept { + return stdexec::when_all(exec::async_construct(std::get(__fyn), std::get(__stgn))...); + }, __fyn_, __stgn_), + __constructed_t{&__fyn_, &__stgn_, &__result_, &__inner_, &__inside_state_, &__destruct_state_, &__rcvr_}); + }; + __construct_state_.emplace(stdexec::__conv{__construct}); + } + + void start() noexcept; + }; +}; + +template +struct __sender { + using _InnerFn = stdexec::__t<_InnerFnId>; + + struct __t { + using __id = __sender; + + using __fyn_t = stdexec::__decayed_std_tuple...>; + + _InnerFn __inner_; + __fyn_t __fyn_; + explicit __t(_InnerFn __i, __fyn_t __fy_) : __inner_(std::move(__i)), __fyn_(std::move(__fy_)) {} + + using sender_concept = stdexec::sender_t; + + template + using __construction_n = stdexec::__call_result_t; + using __construction = stdexec::__call_result_t>...>; + + using __inside = stdexec::__call_result_t<_InnerFn, typename stdexec::__t<_FynId>::handle...>; + + using __exception_completion = stdexec::completion_signatures; + + template + using __result_t = __async_using::__variant_for_t< + stdexec::__concat_completion_signatures< + __async_using::__non_value_completion_signatures_t<__construction, stdexec::env_of_t<_Receiver>>, + stdexec::completion_signatures_of_t<__inside, stdexec::env_of_t<_Receiver>>, + // always reserve *storage* for exception_ptr so that the actual + // completion-signatures can be calculated + __exception_completion>>; + + // + // calculate the completion_signatures using a __decl_receiver<_Env> + // and an empty error completion filter + // + + template + using __destructed_t = stdexec::__t<__destructed< + stdexec::__id<__result_t<_Receiver>>, + // do not filter out any completion in result_t + // so that the actual completion-signatures can be calculated + stdexec::__id>, + stdexec::__id<_Receiver>>>; + template + using __destruction_n = stdexec::__call_result_t; + using __destruction = stdexec::__call_result_t>...>; + template + using __destruct_state = stdexec::connect_result_t<__destruction, __destructed_t<_Receiver>>; + + template + using __outside_t = stdexec::__t<__outside< + stdexec::__id<__result_t<_Receiver>>, + stdexec::__id<__destruct_state<_Receiver>>, + stdexec::__id<_Receiver>>>; + + template + using __fake_rcvr = stdexec::__t>; + + // calculate if using InnerFn can throw + template + static constexpr bool __inner_nothrow = + stdexec::__nothrow_callable<_InnerFn, typename stdexec::__t<_FynId>::handle...> && + stdexec::__nothrow_callable>; + + template + STDEXEC_ATTRIBUTE((always_inline)) // + auto get_completion_signatures(_Env&& __env) const noexcept // + -> stdexec::__concat_completion_signatures< + // add completions of sender returned from InnerFn + stdexec::completion_signatures_of_t<__inside, _Env>, + // add non-set_value completions of all the async-constructors + __async_using::__non_value_completion_signatures_t<__construction, _Env>, + // add std::exception_ptr if using InnerFn can throw + stdexec::__if_c< + __inner_nothrow<__fake_rcvr<_Env>>, + stdexec::completion_signatures<>, + __exception_completion>> { + return {}; + } + + private: + // + // produce the actual operation once the receiver is connected + // + + // calculate the filter to use when applying result_t to the _Receiver + template + using __error_completion_filter = stdexec::__if_c< + __inner_nothrow<_Receiver>, + __exception_completion, + stdexec::completion_signatures<>>; + + template + using __operation = stdexec::__t<__operation< + _InnerFnId, stdexec::__id>, + // apply the filter to use when applying result_t to the _Receiver + stdexec::__id<__error_completion_filter<_Receiver>>, + _FynId...>>; + + using connect_t = stdexec::connect_t; + template + requires stdexec::receiver_of<_Receiver, stdexec::completion_signatures_of_t<__t, stdexec::env_of_t<_Receiver>>> + STDEXEC_MEMFN_DECL(auto connect)(this const __t& __self, _Receiver __rcvr) -> __operation<_Receiver> { + return {(_Receiver&&) __rcvr, __self.__inner_, __self.__fyn_}; + } + }; +}; +template +using __sender_t = stdexec::__t<__sender>, stdexec::__id>...>>; + +template +inline void __operation<_InnerFnId, _ReceiverId, _ErrorCompletionFilterId, _FynId...>::__t::start() noexcept { + stdexec::start(__construct_state_.value()); +} + +} // namespace __async_using + +// async_using is an algorithm that creates a set of async-objects +// and provides handles to the constructed objects to a given async-function +struct async_using_t { + template + using sender_t = __async_using::__sender_t<_InnerFn, _Fyn...>; + + template + sender_t<_InnerFn, _Fyn...> operator()(_InnerFn&& __inner, _Fyn&&... __fyn) const { + using __fyn_t = typename sender_t<_InnerFn, _Fyn...>::__fyn_t; + return sender_t<_InnerFn, _Fyn...>{(_InnerFn&&)__inner, __fyn_t{(_Fyn&&)__fyn...}}; + } +}; +constexpr inline static async_using_t async_using{}; + +} // namespace exec diff --git a/include/exec/packaged_async_object.hpp b/include/exec/packaged_async_object.hpp new file mode 100644 index 000000000..b6c074b71 --- /dev/null +++ b/include/exec/packaged_async_object.hpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "stdexec/__detail/__execution_fwd.hpp" +#include "stdexec/__detail/__transform_completion_signatures.hpp" + +#include "stdexec/concepts.hpp" +#include "stdexec/functional.hpp" + +#include "async_object.hpp" + +namespace exec { + +// +// packaged_async_object +// A utility to allow async_using to take a pack of +// async-objects that have a defaulted async_constructor +// +template +struct packaged_async_object { + using object = typename _O::object; + using handle = typename _O::handle; + using storage = typename _O::storage; + using arguments = stdexec::__decayed_std_tuple<_An...>; + + packaged_async_object() = delete; + template _T, class... _Tn> + explicit packaged_async_object(_T&& __t, _Tn&&... __tn) : + __o_((_T&&)__t), + __an_((_Tn&&)__tn...) { + } +private: + _O __o_; + arguments __an_; + +public: + + auto async_construct(storage& stg) noexcept(noexcept(__o_.async_construct(std::declval(), std::declval<_An&&>()...))) { + return stdexec::__apply( + [&](_Args&&... __args) noexcept(noexcept(__o_.async_construct(std::declval(), (_Args&&) __args...))) { + return this->__o_.async_construct(stg, (_Args&&) __args...); + }, + __an_); + } + + auto async_destruct(storage& stg) noexcept { + return __o_.async_destruct(stg); + } +}; + +template +packaged_async_object(_O&&, _An&&...) -> packaged_async_object, std::remove_cvref_t<_An>...>; + +struct pack_async_object_t { + template + auto operator()(T&& t, Tn&&... tn) const + -> packaged_async_object, std::remove_cvref_t...> { + return packaged_async_object{std::forward(t), std::forward(tn)...}; + } +}; +constexpr inline static pack_async_object_t pack_async_object; + +} // namespace exec diff --git a/include/exec/stop_object.hpp b/include/exec/stop_object.hpp new file mode 100644 index 000000000..efba7c424 --- /dev/null +++ b/include/exec/stop_object.hpp @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2024 Kirk Shoop + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "stdexec/__detail/__execution_fwd.hpp" + +#include "stdexec/concepts.hpp" +#include "stdexec/functional.hpp" +#include "stdexec/stop_token.hpp" + +#include "finally.hpp" + +#include "async_object.hpp" +#include "async_using.hpp" +#include "packaged_async_object.hpp" + +#include + +namespace exec { + +template +struct stop_callback_object { + using object = typename _Token::template callback_type<_Callback>; + class handle { + friend struct stop_callback_object; + explicit handle() {} + }; + using storage = std::optional; + + template + auto async_construct(storage& stg, _T&& __t, _C&& __c) const noexcept { + auto construct = [](storage& stg, auto&& __t, auto&& __c) noexcept -> handle { + stg.emplace(static_cast<_T&&>(__t), static_cast<_C&&>(__c)); + return handle{}; + }; + return stdexec::then(stdexec::just(std::ref(stg), static_cast<_T&&>(__t), static_cast<_C&&>(__c)), construct); + } + auto async_destruct(storage& stg) const noexcept { + auto destruct = [](storage& stg) noexcept { + stg.reset(); + }; + return stdexec::then(stdexec::just(std::ref(stg)), destruct); + } +}; + +struct stop_object { + using object = stdexec::inplace_stop_source; + class handle { + object* source; + friend struct stop_object; + explicit handle(object& s) : source(&s) {} + public: + handle() = delete; + handle(const handle&) = default; + handle(handle&&) = default; + handle& operator=(const handle&) = default; + handle& operator=(handle&&) = default; + + stdexec::inplace_stop_token get_token() const noexcept { + return source->get_token(); + } + bool stop_requested() const noexcept { + return source->stop_requested(); + } + static constexpr bool stop_possible() noexcept { + return true; + } + bool request_stop() noexcept { + return source->request_stop(); + } + // chain has two effects + // 1. chain applies the stop_token for this stop-source to the env of + // the given sender. + // 2. chain retrieves the stop_token from the environment of the receiver + // connected to the returned sender, and uses a stop_callback_object + // to forward a stop_request from the external stop-source to this + // stop-source + auto chain(auto sender) noexcept { + auto stop_token = source->get_token(); + auto bind = [sender, stop_token, source = this->source](auto ext_stop) noexcept { + auto callback = [source]() noexcept {source->request_stop();}; + auto with_callback = [sender, stop_token](auto cb) noexcept { + return stdexec::__write_env( + std::move(sender), + stdexec::__env::__with(stop_token, stdexec::get_stop_token)); + }; + exec::packaged_async_object cb{stop_callback_object{}, ext_stop, callback}; + return exec::async_using(with_callback, cb); + }; + return stdexec::let_value(stdexec::read_env(stdexec::get_stop_token), bind); + } + }; + using storage = std::optional; + + auto async_construct(storage& stg) const noexcept { + auto construct = [](storage& stg) noexcept -> handle { + stg.emplace(); + return handle{stg.value()}; + }; + return stdexec::then(stdexec::just(std::ref(stg)), construct); + } + auto async_destruct(storage& stg) const noexcept { + auto destruct = [](storage& stg) noexcept { + stg.reset(); + }; + return stdexec::then(stdexec::just(std::ref(stg)), destruct); + } +}; + + +} // namespace exec diff --git a/include/stdexec/__detail/__optional.hpp b/include/stdexec/__detail/__optional.hpp index dbec013e4..b5b9134aa 100644 --- a/include/stdexec/__detail/__optional.hpp +++ b/include/stdexec/__detail/__optional.hpp @@ -75,7 +75,6 @@ namespace stdexec { } template - requires constructible_from<_Tp, _Us...> _Tp& emplace(_Us&&... __us) noexcept(__nothrow_constructible_from<_Tp, _Us...>) { reset(); // sets __has_value to false in case the next line throws ::new (&__value) _Tp{static_cast<_Us&&>(__us)...}; @@ -83,6 +82,15 @@ namespace stdexec { return __value; } + template + _Tp& emplace_from(_Fn&& __fn) noexcept(__nothrow_callable<_Fn>) { + static_assert(__same_as<__call_result_t<_Fn>, _Tp>); + reset(); // sets __has_value to false in case the next line throws + ::new (&__value) _Tp(static_cast<_Fn&&>(__fn)()); + __has_value = true; + return __value; + } + _Tp& value() & { if (!__has_value) { throw __bad_optional_access(); diff --git a/include/stdexec/__detail/__read_env.hpp b/include/stdexec/__detail/__read_env.hpp index db0e2632e..6408d17fa 100644 --- a/include/stdexec/__detail/__read_env.hpp +++ b/include/stdexec/__detail/__read_env.hpp @@ -111,9 +111,9 @@ namespace stdexec { } else { constexpr bool _Nothrow = __nothrow_callable<__query, env_of_t<_Receiver>>; auto __query_fn = [&]() noexcept(_Nothrow) -> __result&& { - __state.__result_.emplace(__conv{[&]() noexcept(_Nothrow) { + __state.__result_.emplace_from([&]() noexcept(_Nothrow) { return __query()(stdexec::get_env(__rcvr)); - }}); + }); return static_cast<__result&&>(*__state.__result_); }; stdexec::__set_value_invoke(static_cast<_Receiver&&>(__rcvr), __query_fn); diff --git a/include/stdexec/__detail/__senders.hpp b/include/stdexec/__detail/__senders.hpp index b228a509f..7de64056b 100644 --- a/include/stdexec/__detail/__senders.hpp +++ b/include/stdexec/__detail/__senders.hpp @@ -258,4 +258,18 @@ namespace stdexec { _Env, __mcompose_q<__types_ref, __qf<__tag_of_sig_t<_SetSig>>::template __f>, __mappend_into_q<__types_ref>>>; + + template + requires false + using __nofail_t = _Error; + + template + concept __nofail_sender = sender_in<_Sender, _Env> && requires { + typename __gather_completion_signatures< + __completion_signatures_of_t<_Sender, _Env>, + set_error_t, + __nofail_t, + __sigs::__default_completion, + __types>; + }; } // namespace stdexec diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5648a538a..2cb88df2e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -74,6 +74,10 @@ set(stdexec_test_sources exec/test_on3.cpp exec/test_repeat_effect_until.cpp exec/test_repeat_n.cpp + exec/async_object/test_async_tuple.cpp + exec/async_object/test_stop_object0.cpp + exec/async_object/test_stop_object1.cpp + exec/async_object/test_stop_object2.cpp exec/async_scope/test_dtor.cpp exec/async_scope/test_spawn.cpp exec/async_scope/test_spawn_future.cpp @@ -101,6 +105,8 @@ set_target_properties(test.stdexec PROPERTIES CXX_STANDARD_REQUIRED ON CXX_EXTENSIONS OFF) target_include_directories(test.stdexec PRIVATE ${CMAKE_CURRENT_LIST_DIR}) +target_compile_options(test.stdexec PRIVATE + $<$,$>:-ftemplate-backtrace-limit=0>) target_link_libraries(test.stdexec PUBLIC STDEXEC::stdexec diff --git a/test/exec/async_object/test_async_tuple.cpp b/test/exec/async_object/test_async_tuple.cpp new file mode 100644 index 000000000..9534f91c5 --- /dev/null +++ b/test/exec/async_object/test_async_tuple.cpp @@ -0,0 +1,31 @@ +#include +#include +#include +#include +#include + +#include "test_common/schedulers.hpp" +#include "test_common/receivers.hpp" + +namespace ex = stdexec; +using exec::stop_object; +using exec::async_using; +using stdexec::sync_wait; + +namespace { + + TEST_CASE("async_tuple simple", "[stop_object][async_object][async_tuple]") { + auto with_tuple = [](auto tpl) noexcept { + auto [s0, s1] = tpl; + return s0.chain(ex::just(false)); + }; + ex::sender auto snd = async_using( + with_tuple, + exec::make_async_tuple(stop_object{}, stop_object{})); + auto r = sync_wait(std::move(snd)); + REQUIRE(r.has_value()); + auto [v] = r.value(); + REQUIRE(v == false); + } + +} \ No newline at end of file diff --git a/test/exec/async_object/test_stop_object0.cpp b/test/exec/async_object/test_stop_object0.cpp new file mode 100644 index 000000000..e7c2c3e7a --- /dev/null +++ b/test/exec/async_object/test_stop_object0.cpp @@ -0,0 +1,26 @@ +#include +#include +#include +#include "test_common/schedulers.hpp" +#include "test_common/receivers.hpp" + +namespace ex = stdexec; +using exec::stop_object; +using exec::async_using; +using stdexec::sync_wait; + +namespace { + using handle = typename stop_object::handle; + + TEST_CASE("stop_object unused", "[stop_object][async_object]") { + auto with_stop_object = [](handle s0) noexcept { + return s0.chain(ex::just(false)); + }; + ex::sender auto snd = async_using(with_stop_object, stop_object{}); + auto r = sync_wait(std::move(snd)); + REQUIRE(r.has_value()); + auto [v] = r.value(); + REQUIRE(v == false); + } + +} \ No newline at end of file diff --git a/test/exec/async_object/test_stop_object1.cpp b/test/exec/async_object/test_stop_object1.cpp new file mode 100644 index 000000000..e8e3a0828 --- /dev/null +++ b/test/exec/async_object/test_stop_object1.cpp @@ -0,0 +1,28 @@ +#include +#include +#include +#include "test_common/schedulers.hpp" +#include "test_common/receivers.hpp" + +namespace ex = stdexec; +using exec::stop_object; +using exec::async_using; +using stdexec::sync_wait; + +namespace { + using handle = typename stop_object::handle; + + TEST_CASE("chained stop_object is not stopped", "[stop_object][async_object]") { + auto with_stop_objects = [](handle s0, handle s1) noexcept { + auto with_s1_stop_token = [](auto stp) noexcept { return stp.stop_requested(); }; + auto inside = ex::then(ex::read_env(ex::get_stop_token), with_s1_stop_token); + return s0.chain(s1.chain(inside)); + }; + ex::sender auto snd = async_using(with_stop_objects, stop_object{}, stop_object{}); + auto r = sync_wait(std::move(snd)); + REQUIRE(r.has_value()); + auto [v] = r.value(); + REQUIRE(v == false); + } + +} \ No newline at end of file diff --git a/test/exec/async_object/test_stop_object2.cpp b/test/exec/async_object/test_stop_object2.cpp new file mode 100644 index 000000000..84aa89160 --- /dev/null +++ b/test/exec/async_object/test_stop_object2.cpp @@ -0,0 +1,33 @@ +#include +#include +#include +#include "test_common/schedulers.hpp" +#include "test_common/receivers.hpp" + +namespace ex = stdexec; +using exec::stop_object; +using exec::async_using; +using stdexec::sync_wait; + +namespace { + using handle = typename stop_object::handle; + + TEST_CASE("chained stop_object is stopped", "[stop_object][async_object]") { + auto with_stop_objects = [](handle s0, handle s1) noexcept { + auto with_s1_stop_token = [s0](auto stp) mutable noexcept { + REQUIRE(s0.stop_requested() == false); + REQUIRE(stp.stop_requested() == false); + s0.request_stop(); + return stp.stop_requested(); + }; + auto inside = ex::then(ex::read_env(ex::get_stop_token), with_s1_stop_token); + return s0.chain(s1.chain(inside)); + }; + ex::sender auto snd = async_using(with_stop_objects, stop_object{}, stop_object{}); + auto r = sync_wait(std::move(snd)); + REQUIRE(r.has_value()); + auto [v] = r.value(); + REQUIRE(v == true); + } + + } // namespace \ No newline at end of file