From 531d86a5c08dee61f019a93ef0045fb50404f651 Mon Sep 17 00:00:00 2001 From: Robert Leahy Date: Fri, 24 Oct 2025 14:37:07 -0400 Subject: [PATCH] exec::unless_stop_requested --- include/exec/unless_stop_requested.hpp | 102 +++++++++++++++++++++++ test/exec/CMakeLists.txt | 1 + test/exec/test_unless_stop_requested.cpp | 88 +++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 include/exec/unless_stop_requested.hpp create mode 100644 test/exec/test_unless_stop_requested.cpp diff --git a/include/exec/unless_stop_requested.hpp b/include/exec/unless_stop_requested.hpp new file mode 100644 index 000000000..41d3cf8ea --- /dev/null +++ b/include/exec/unless_stop_requested.hpp @@ -0,0 +1,102 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../stdexec/execution.hpp" + +namespace exec { + namespace __unless_stop_requested { + using namespace stdexec; + + template + inline constexpr bool __unstoppable_env = unstoppable_token>; + + template + inline constexpr bool __unstoppable_receiver = __unstoppable_env>; + + template + using __completions = transform_completion_signatures< + __completion_signatures_of_t<_Sender, _Env>, + std::conditional_t< + __unstoppable_env<_Env>, + completion_signatures<>, + completion_signatures>>; + + struct __connect_fn { + template + requires __unstoppable_receiver<_Receiver> + constexpr connect_result_t<__child_of<_Sender>, _Receiver> + operator()(_Sender&& __sndr, _Receiver __rcvr) const noexcept( + noexcept(stdexec::connect(__declval<__child_of<_Sender>>(), (_Receiver&&) __rcvr))) { + return __sexpr_apply((_Sender&&) __sndr, [&](auto, const auto&, auto&& __child) { + return stdexec::connect((decltype(__child)&&) __child, (_Receiver&&) __rcvr); + }); + } + template + constexpr __op_state<_Sender, _Receiver> operator()(_Sender&& __sndr, _Receiver __rcvr) const + noexcept(__nothrow_constructible_from<__op_state<_Sender, _Receiver>, _Sender, _Receiver>) { + return __op_state<_Sender, _Receiver>{(_Sender&&) __sndr, (_Receiver&&) __rcvr}; + } + }; + + struct unless_stop_requested_t : sender_adaptor_closure { + constexpr auto operator()() const noexcept { + return *this; + } + template + constexpr __well_formed_sender auto operator()(_Sender&& __sndr) const { + auto __domain = __get_early_domain(__sndr); + return stdexec::transform_sender( + __domain, __make_sexpr(__(), static_cast<_Sender&&>(__sndr))); + } + }; + + struct __unless_stop_requested_impl : __sexpr_defaults { + static constexpr auto get_completion_signatures = + [](_Self&&, _Env&&) noexcept + -> __completions<__child_of<_Self>, _Env> { + static_assert(sender_expr_for<_Self, unless_stop_requested_t>); + return {}; + }; + + static constexpr auto start = []( + _State&, + _Receiver& __rcvr, + _Operation& __child_op) noexcept -> void { + static_assert(!__unstoppable_receiver<_Receiver>); + if (get_stop_token(stdexec::get_env(__rcvr)).stop_requested()) { + stdexec::set_stopped((_Receiver&&) __rcvr); + return; + } + stdexec::start(__child_op); + }; + + static constexpr __connect_fn connect{}; + }; + } // namespace __unless_stop_requested + + using __unless_stop_requested::unless_stop_requested_t; + inline constexpr __unless_stop_requested::unless_stop_requested_t unless_stop_requested{}; +} // namespace exec + +namespace stdexec { + template <> + struct __sexpr_impl<::exec::unless_stop_requested_t> + : ::exec::__unless_stop_requested::__unless_stop_requested_impl { }; +} // namespace stdexec diff --git a/test/exec/CMakeLists.txt b/test/exec/CMakeLists.txt index 50eff8e79..e77a03184 100644 --- a/test/exec/CMakeLists.txt +++ b/test/exec/CMakeLists.txt @@ -55,6 +55,7 @@ set(exec_test_sources $<$:../execpools/test_asio_thread_pool.cpp> test_system_context.cpp $<$:test_libdispatch.cpp> + test_unless_stop_requested.cpp ) add_executable(test.exec ${exec_test_sources}) diff --git a/test/exec/test_unless_stop_requested.cpp b/test/exec/test_unless_stop_requested.cpp new file mode 100644 index 000000000..53fe8a510 --- /dev/null +++ b/test/exec/test_unless_stop_requested.cpp @@ -0,0 +1,88 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include + +#include "../test_common/receivers.hpp" +#include "../test_common/type_helpers.hpp" + +namespace { + + TEST_CASE( + "When stop has not been requested the child operation runs normally", + "[unless_stop_requested]") { + ::stdexec::inplace_stop_source source; + auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token()); + static_assert(!::exec::__unless_stop_requested::__unstoppable_env); + { + auto sender = ::stdexec::just() | ::exec::unless_stop_requested; + static_assert( + set_equivalent< + ::stdexec::completion_signatures_of_t, + ::stdexec::completion_signatures<::stdexec::set_value_t(), ::stdexec::set_stopped_t()>>); + auto op = ::stdexec::connect(sender, expect_void_receiver(env)); + ::stdexec::start(op); + } + { + auto sender = ::stdexec::just(5) | ::exec::unless_stop_requested(); + static_assert(set_equivalent< + ::stdexec::completion_signatures_of_t, + ::stdexec::completion_signatures< + ::stdexec::set_value_t(int), + ::stdexec::set_stopped_t()>>); + auto op = ::stdexec::connect(sender, expect_value_receiver(env_tag{}, env, 5)); + ::stdexec::start(op); + } + } + + TEST_CASE( + "When stop has been requested the child operation is not started", + "[unless_stop_requested]") { + ::stdexec::inplace_stop_source source; + source.request_stop(); + auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token()); + static_assert(!::exec::__unless_stop_requested::__unstoppable_env); + auto sender = ::stdexec::just() + | ::stdexec::then([&]() { FAIL_CHECK("Operation should not have been started"); }) + | ::exec::unless_stop_requested(); + static_assert(set_equivalent< + ::stdexec::completion_signatures_of_t, + ::stdexec::completion_signatures< + ::stdexec::set_value_t(), + ::stdexec::set_error_t(std::exception_ptr), + ::stdexec::set_stopped_t()>>); + auto op = ::stdexec::connect(sender, expect_stopped_receiver(env)); + ::stdexec::start(op); + } + + TEST_CASE("No op when the associated stop token is unstoppable", "[unless_stop_requested]") { + static_assert(::exec::__unless_stop_requested::__unstoppable_env<::stdexec::env<>>); + auto sender = ::stdexec::just() | ::exec::unless_stop_requested; + static_assert(set_equivalent< + ::stdexec::completion_signatures_of_t>, + ::stdexec::completion_signatures<::stdexec::set_value_t()>>); + auto op = ::stdexec::connect(sender, expect_void_receiver{}); + ::stdexec::start(op); + } + +} // unnamed namespace