Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions include/exec/unless_stop_requested.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2025 Robert Leahy. All rights reserved.
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
*
* Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "../stdexec/execution.hpp"

namespace exec {
namespace __unless_stop_requested {
using namespace stdexec;

template <typename _Env>
inline constexpr bool __unstoppable_env = unstoppable_token<stop_token_of_t<_Env>>;

template <typename _Receiver>
inline constexpr bool __unstoppable_receiver = __unstoppable_env<env_of_t<_Receiver>>;

template <class _Sender, class _Env>
using __completions = transform_completion_signatures<
__completion_signatures_of_t<_Sender, _Env>,
std::conditional_t<
__unstoppable_env<_Env>,
completion_signatures<>,
completion_signatures<set_stopped_t()>>>;

struct __connect_fn {
template <class _Sender, class _Receiver>
requires __unstoppable_receiver<_Receiver>
constexpr connect_result_t<__child_of<_Sender>, _Receiver>
operator()(_Sender&& __sndr, _Receiver __rcvr) const noexcept(
noexcept(stdexec::connect(__declval<__child_of<_Sender>>(), (_Receiver&&) __rcvr))) {
return __sexpr_apply((_Sender&&) __sndr, [&](auto, const auto&, auto&& __child) {
return stdexec::connect((decltype(__child)&&) __child, (_Receiver&&) __rcvr);
});
}
template <class _Sender, class _Receiver>
constexpr __op_state<_Sender, _Receiver> operator()(_Sender&& __sndr, _Receiver __rcvr) const
noexcept(__nothrow_constructible_from<__op_state<_Sender, _Receiver>, _Sender, _Receiver>) {
return __op_state<_Sender, _Receiver>{(_Sender&&) __sndr, (_Receiver&&) __rcvr};
}
};

struct unless_stop_requested_t : sender_adaptor_closure<unless_stop_requested_t> {
constexpr auto operator()() const noexcept {
return *this;
}
template <sender _Sender>
constexpr __well_formed_sender auto operator()(_Sender&& __sndr) const {
auto __domain = __get_early_domain(__sndr);
return stdexec::transform_sender(
__domain, __make_sexpr<unless_stop_requested_t>(__(), static_cast<_Sender&&>(__sndr)));
}
};

struct __unless_stop_requested_impl : __sexpr_defaults {
static constexpr auto get_completion_signatures =
[]<class _Self, class _Env>(_Self&&, _Env&&) noexcept
-> __completions<__child_of<_Self>, _Env> {
static_assert(sender_expr_for<_Self, unless_stop_requested_t>);
return {};
};

static constexpr auto start = []<class _State, class _Receiver, class _Operation>(
_State&,
_Receiver& __rcvr,
_Operation& __child_op) noexcept -> void {
static_assert(!__unstoppable_receiver<_Receiver>);
if (get_stop_token(stdexec::get_env(__rcvr)).stop_requested()) {
stdexec::set_stopped((_Receiver&&) __rcvr);
return;
}
stdexec::start(__child_op);
};

static constexpr __connect_fn connect{};
};
} // namespace __unless_stop_requested

using __unless_stop_requested::unless_stop_requested_t;
inline constexpr __unless_stop_requested::unless_stop_requested_t unless_stop_requested{};
} // namespace exec

namespace stdexec {
template <>
struct __sexpr_impl<::exec::unless_stop_requested_t>
: ::exec::__unless_stop_requested::__unless_stop_requested_impl { };
} // namespace stdexec
1 change: 1 addition & 0 deletions test/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(exec_test_sources
$<$<BOOL:${STDEXEC_ENABLE_ASIO}>:../execpools/test_asio_thread_pool.cpp>
test_system_context.cpp
$<$<BOOL:${STDEXEC_ENABLE_LIBDISPATCH}>:test_libdispatch.cpp>
test_unless_stop_requested.cpp
)

add_executable(test.exec ${exec_test_sources})
Expand Down
88 changes: 88 additions & 0 deletions test/exec/test_unless_stop_requested.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2025 Robert Leahy. All rights reserved.
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
*
* Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <exec/unless_stop_requested.hpp>

#include <exception>

#include <catch2/catch.hpp>
#include <stdexec/execution.hpp>

#include "../test_common/receivers.hpp"
#include "../test_common/type_helpers.hpp"

namespace {

TEST_CASE(
"When stop has not been requested the child operation runs normally",
"[unless_stop_requested]") {
::stdexec::inplace_stop_source source;
auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token());
static_assert(!::exec::__unless_stop_requested::__unstoppable_env<decltype(env)>);
{
auto sender = ::stdexec::just() | ::exec::unless_stop_requested;
static_assert(
set_equivalent<
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
::stdexec::completion_signatures<::stdexec::set_value_t(), ::stdexec::set_stopped_t()>>);
auto op = ::stdexec::connect(sender, expect_void_receiver(env));
::stdexec::start(op);
}
{
auto sender = ::stdexec::just(5) | ::exec::unless_stop_requested();
static_assert(set_equivalent<
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
::stdexec::completion_signatures<
::stdexec::set_value_t(int),
::stdexec::set_stopped_t()>>);
auto op = ::stdexec::connect(sender, expect_value_receiver(env_tag{}, env, 5));
::stdexec::start(op);
}
}

TEST_CASE(
"When stop has been requested the child operation is not started",
"[unless_stop_requested]") {
::stdexec::inplace_stop_source source;
source.request_stop();
auto env = ::stdexec::prop(::stdexec::get_stop_token, source.get_token());
static_assert(!::exec::__unless_stop_requested::__unstoppable_env<decltype(env)>);
auto sender = ::stdexec::just()
| ::stdexec::then([&]() { FAIL_CHECK("Operation should not have been started"); })
| ::exec::unless_stop_requested();
static_assert(set_equivalent<
::stdexec::completion_signatures_of_t<decltype(sender), decltype(env)>,
::stdexec::completion_signatures<
::stdexec::set_value_t(),
::stdexec::set_error_t(std::exception_ptr),
::stdexec::set_stopped_t()>>);
auto op = ::stdexec::connect(sender, expect_stopped_receiver(env));
::stdexec::start(op);
}

TEST_CASE("No op when the associated stop token is unstoppable", "[unless_stop_requested]") {
static_assert(::exec::__unless_stop_requested::__unstoppable_env<::stdexec::env<>>);
auto sender = ::stdexec::just() | ::exec::unless_stop_requested;
static_assert(set_equivalent<
::stdexec::completion_signatures_of_t<decltype(sender), ::stdexec::env<>>,
::stdexec::completion_signatures<::stdexec::set_value_t()>>);
auto op = ::stdexec::connect(sender, expect_void_receiver{});
::stdexec::start(op);
}

} // unnamed namespace
Loading