Skip to content

Commit 531d86a

Browse files
committed
exec::unless_stop_requested
1 parent 138e136 commit 531d86a

File tree

3 files changed

+191
-0
lines changed

3 files changed

+191
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* Copyright (c) 2025 Robert Leahy. All rights reserved.
4+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
5+
*
6+
* Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://llvm.org/LICENSE.txt
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#pragma once
20+
21+
#include "../stdexec/execution.hpp"
22+
23+
namespace exec {
24+
namespace __unless_stop_requested {
25+
using namespace stdexec;
26+
27+
template <typename _Env>
28+
inline constexpr bool __unstoppable_env = unstoppable_token<stop_token_of_t<_Env>>;
29+
30+
template <typename _Receiver>
31+
inline constexpr bool __unstoppable_receiver = __unstoppable_env<env_of_t<_Receiver>>;
32+
33+
template <class _Sender, class _Env>
34+
using __completions = transform_completion_signatures<
35+
__completion_signatures_of_t<_Sender, _Env>,
36+
std::conditional_t<
37+
__unstoppable_env<_Env>,
38+
completion_signatures<>,
39+
completion_signatures<set_stopped_t()>>>;
40+
41+
struct __connect_fn {
42+
template <class _Sender, class _Receiver>
43+
requires __unstoppable_receiver<_Receiver>
44+
constexpr connect_result_t<__child_of<_Sender>, _Receiver>
45+
operator()(_Sender&& __sndr, _Receiver __rcvr) const noexcept(
46+
noexcept(stdexec::connect(__declval<__child_of<_Sender>>(), (_Receiver&&) __rcvr))) {
47+
return __sexpr_apply((_Sender&&) __sndr, [&](auto, const auto&, auto&& __child) {
48+
return stdexec::connect((decltype(__child)&&) __child, (_Receiver&&) __rcvr);
49+
});
50+
}
51+
template <class _Sender, class _Receiver>
52+
constexpr __op_state<_Sender, _Receiver> operator()(_Sender&& __sndr, _Receiver __rcvr) const
53+
noexcept(__nothrow_constructible_from<__op_state<_Sender, _Receiver>, _Sender, _Receiver>) {
54+
return __op_state<_Sender, _Receiver>{(_Sender&&) __sndr, (_Receiver&&) __rcvr};
55+
}
56+
};
57+
58+
struct unless_stop_requested_t : sender_adaptor_closure<unless_stop_requested_t> {
59+
constexpr auto operator()() const noexcept {
60+
return *this;
61+
}
62+
template <sender _Sender>
63+
constexpr __well_formed_sender auto operator()(_Sender&& __sndr) const {
64+
auto __domain = __get_early_domain(__sndr);
65+
return stdexec::transform_sender(
66+
__domain, __make_sexpr<unless_stop_requested_t>(__(), static_cast<_Sender&&>(__sndr)));
67+
}
68+
};
69+
70+
struct __unless_stop_requested_impl : __sexpr_defaults {
71+
static constexpr auto get_completion_signatures =
72+
[]<class _Self, class _Env>(_Self&&, _Env&&) noexcept
73+
-> __completions<__child_of<_Self>, _Env> {
74+
static_assert(sender_expr_for<_Self, unless_stop_requested_t>);
75+
return {};
76+
};
77+
78+
static constexpr auto start = []<class _State, class _Receiver, class _Operation>(
79+
_State&,
80+
_Receiver& __rcvr,
81+
_Operation& __child_op) noexcept -> void {
82+
static_assert(!__unstoppable_receiver<_Receiver>);
83+
if (get_stop_token(stdexec::get_env(__rcvr)).stop_requested()) {
84+
stdexec::set_stopped((_Receiver&&) __rcvr);
85+
return;
86+
}
87+
stdexec::start(__child_op);
88+
};
89+
90+
static constexpr __connect_fn connect{};
91+
};
92+
} // namespace __unless_stop_requested
93+
94+
using __unless_stop_requested::unless_stop_requested_t;
95+
inline constexpr __unless_stop_requested::unless_stop_requested_t unless_stop_requested{};
96+
} // namespace exec
97+
98+
namespace stdexec {
99+
template <>
100+
struct __sexpr_impl<::exec::unless_stop_requested_t>
101+
: ::exec::__unless_stop_requested::__unless_stop_requested_impl { };
102+
} // namespace stdexec

test/exec/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ set(exec_test_sources
5555
$<$<BOOL:${STDEXEC_ENABLE_ASIO}>:../execpools/test_asio_thread_pool.cpp>
5656
test_system_context.cpp
5757
$<$<BOOL:${STDEXEC_ENABLE_LIBDISPATCH}>:test_libdispatch.cpp>
58+
test_unless_stop_requested.cpp
5859
)
5960

6061
add_executable(test.exec ${exec_test_sources})
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* Copyright (c) 2025 Robert Leahy. All rights reserved.
4+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
5+
*
6+
* Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://llvm.org/LICENSE.txt
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include <exec/unless_stop_requested.hpp>
20+
21+
#include <exception>
22+
23+
#include <catch2/catch.hpp>
24+
#include <stdexec/execution.hpp>
25+
26+
#include "../test_common/receivers.hpp"
27+
#include "../test_common/type_helpers.hpp"
28+
29+
namespace {
30+
31+
TEST_CASE(
32+
"When stop has not been requested the child operation runs normally",
33+
"[unless_stop_requested]") {
34+
::stdexec::inplace_stop_source source;
35+
auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token());
36+
static_assert(!::exec::__unless_stop_requested::__unstoppable_env<decltype(env)>);
37+
{
38+
auto sender = ::stdexec::just() | ::exec::unless_stop_requested;
39+
static_assert(
40+
set_equivalent<
41+
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
42+
::stdexec::completion_signatures<::stdexec::set_value_t(), ::stdexec::set_stopped_t()>>);
43+
auto op = ::stdexec::connect(sender, expect_void_receiver(env));
44+
::stdexec::start(op);
45+
}
46+
{
47+
auto sender = ::stdexec::just(5) | ::exec::unless_stop_requested();
48+
static_assert(set_equivalent<
49+
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
50+
::stdexec::completion_signatures<
51+
::stdexec::set_value_t(int),
52+
::stdexec::set_stopped_t()>>);
53+
auto op = ::stdexec::connect(sender, expect_value_receiver(env_tag{}, env, 5));
54+
::stdexec::start(op);
55+
}
56+
}
57+
58+
TEST_CASE(
59+
"When stop has been requested the child operation is not started",
60+
"[unless_stop_requested]") {
61+
::stdexec::inplace_stop_source source;
62+
source.request_stop();
63+
auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token());
64+
static_assert(!::exec::__unless_stop_requested::__unstoppable_env<decltype(env)>);
65+
auto sender = ::stdexec::just()
66+
| ::stdexec::then([&]() { FAIL_CHECK("Operation should not have been started"); })
67+
| ::exec::unless_stop_requested();
68+
static_assert(set_equivalent<
69+
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
70+
::stdexec::completion_signatures<
71+
::stdexec::set_value_t(),
72+
::stdexec::set_error_t(std::exception_ptr),
73+
::stdexec::set_stopped_t()>>);
74+
auto op = ::stdexec::connect(sender, expect_stopped_receiver(env));
75+
::stdexec::start(op);
76+
}
77+
78+
TEST_CASE("No op when the associated stop token is unstoppable", "[unless_stop_requested]") {
79+
static_assert(::exec::__unless_stop_requested::__unstoppable_env<::stdexec::env<>>);
80+
auto sender = ::stdexec::just() | ::exec::unless_stop_requested;
81+
static_assert(set_equivalent<
82+
::stdexec::completion_signatures_of_t<decltype(sender), ::stdexec::env<>>,
83+
::stdexec::completion_signatures<::stdexec::set_value_t()>>);
84+
auto op = ::stdexec::connect(sender, expect_void_receiver{});
85+
::stdexec::start(op);
86+
}
87+
88+
} // unnamed namespace

0 commit comments

Comments
 (0)