Skip to content

Commit b94bd4d

Browse files
committed
Merge remote-tracking branch 'origin/main' into feature/cuda_reduce
2 parents f0dc533 + 384c59c commit b94bd4d

File tree

19 files changed

+385
-333
lines changed

19 files changed

+385
-333
lines changed

examples/nvexec/maxwell/snr.cuh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ struct repeat_n_sender_t {
268268

269269
#if defined(_NVHPC_CUDA) || defined(__CUDACC__)
270270
template <stdexec::__decays_to<repeat_n_sender_t> Self, stdexec::receiver Receiver>
271-
requires(stdexec::tag_invocable<stdexec::connect_t, Sender, Receiver>)
271+
requires(stdexec::sender_to<Sender, Receiver>)
272272
&& (!nvexec::STDEXEC_STREAM_DETAIL_NS::receiver_with_stream_env<Receiver>)
273273
friend auto tag_invoke(stdexec::connect_t, Self&& self, Receiver r)
274274
-> repeat_n_detail::operation_state_t<SenderId, ClosureId, stdexec::__id<Receiver>> {
@@ -277,7 +277,7 @@ struct repeat_n_sender_t {
277277
}
278278

279279
template <stdexec::__decays_to<repeat_n_sender_t> Self, stdexec::receiver Receiver>
280-
requires(stdexec::tag_invocable<stdexec::connect_t, Sender, Receiver>)
280+
requires(stdexec::sender_to<Sender, Receiver>)
281281
&& (nvexec::STDEXEC_STREAM_DETAIL_NS::receiver_with_stream_env<Receiver>)
282282
friend auto tag_invoke(stdexec::connect_t, Self&& self, Receiver r)
283283
-> nvexec::STDEXEC_STREAM_DETAIL_NS::repeat_n::
@@ -288,7 +288,7 @@ struct repeat_n_sender_t {
288288
}
289289
#else
290290
template <stdexec::__decays_to<repeat_n_sender_t> Self, stdexec::receiver Receiver>
291-
requires stdexec::tag_invocable<stdexec::connect_t, Sender, Receiver>
291+
requires stdexec::sender_to<Sender, Receiver>
292292
friend auto tag_invoke(stdexec::connect_t, Self&& self, Receiver r)
293293
-> repeat_n_detail::operation_state_t<SenderId, ClosureId, stdexec::__id<Receiver>> {
294294
return repeat_n_detail::operation_state_t<SenderId, ClosureId, stdexec::__id<Receiver>>(

include/exec/linux/io_uring_context.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ namespace exec {
259259
// This function first completes all tasks that are ready in the completion queue of the io_uring.
260260
// Then it completes all tasks that are ready in the given queue of ready tasks.
261261
// The function returns the number of previously submitted completed tasks.
262-
int complete(stdexec::__intrusive_queue<&__task::__next_> __ready = __task_queue{}) noexcept {
262+
int
263+
complete(stdexec::__intrusive_queue<& __task::__next_> __ready = __task_queue{}) noexcept {
263264
__u32 __head = __head_.load(std::memory_order_relaxed);
264265
__u32 __tail = __tail_.load(std::memory_order_acquire);
265266
int __count = 0;

include/exec/static_thread_pool.hpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,6 @@ namespace exec {
8787
};
8888

8989
struct domain {
90-
template <class Sender>
91-
Sender transform_sender(Sender&& sndr) const noexcept {
92-
return static_cast<Sender&&>(sndr);
93-
}
94-
9590
// For eager customization
9691
template <stdexec::sender_expr_for<stdexec::bulk_t> Sender>
9792
auto transform_sender(Sender&& sndr) const noexcept {
@@ -100,11 +95,6 @@ namespace exec {
10095
return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_});
10196
}
10297

103-
template <class Sender, class Env>
104-
Sender transform_sender(Sender&& sndr, const Env&) const noexcept {
105-
return static_cast<Sender&&>(sndr);
106-
}
107-
10898
// transform the generic bulk sender into a parallel thread-pool bulk sender
10999
template <stdexec::sender_expr_for<stdexec::bulk_t> Sender, class Env>
110100
requires stdexec::__callable<stdexec::get_scheduler_t, Env>

include/nvexec/multi_gpu_context.cuh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ namespace nvexec {
174174
Senders&&... sndrs) noexcept {
175175
return transfer_when_all_sender_th<
176176
multi_gpu_stream_scheduler,
177-
tag_invoke_result_t<into_variant_t, Senders>...>(
177+
__result_of<into_variant, Senders>...>(
178178
sch.context_state_, into_variant((Senders&&) sndrs)...);
179179
}
180180

include/nvexec/stream/common.cuh

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -330,16 +330,12 @@ namespace nvexec {
330330

331331
template <class BaseEnv>
332332
auto make_stream_env(BaseEnv&& base_env, stream_provider_t* stream_provider) noexcept {
333-
return __join_env(
334-
__env::__env_fn{[stream_provider](get_stream_provider_t) noexcept {
335-
return stream_provider;
336-
}},
337-
(BaseEnv&&) base_env);
333+
return __join_env(__mkprop(get_stream_provider, stream_provider), (BaseEnv&&) base_env);
338334
}
339335

340336
template <class BaseEnv>
341337
requires __callable<get_stream_provider_t, const BaseEnv&>
342-
BaseEnv make_stream_env(BaseEnv&& base_env, stream_provider_t*) noexcept {
338+
__decay_t<BaseEnv> make_stream_env(BaseEnv&& base_env, stream_provider_t*) noexcept {
343339
return (BaseEnv&&) base_env;
344340
}
345341

@@ -367,10 +363,12 @@ namespace nvexec {
367363
template <class BaseEnv>
368364
using make_terminal_stream_env_t = terminal_stream_env<BaseEnv>;
369365

370-
template <class S>
366+
template <class S, class E>
371367
concept stream_sender = //
372-
sender<S> && //
373-
STDEXEC_IS_BASE_OF(stream_sender_base, __decay_t<S>);
368+
sender_in<S, E> && //
369+
STDEXEC_IS_BASE_OF(
370+
stream_sender_base,
371+
__decay_t<transform_sender_result_t<__env_domain_of_t<E>, S, E>>);
374372

375373
template <class R>
376374
concept stream_receiver = //
@@ -381,7 +379,7 @@ namespace nvexec {
381379

382380
template <class EnvId, class Variant>
383381
struct stream_enqueue_receiver {
384-
using Env = stdexec::__t<EnvId>;
382+
using Env = stdexec::__cvref_t<EnvId>;
385383

386384
class __t {
387385
Env* env_;
@@ -630,9 +628,9 @@ namespace nvexec {
630628

631629
using task_t = continuation_task_t<inner_receiver_t, variant_t>;
632630
using stream_enqueue_receiver_t =
633-
stdexec::__t<stream_enqueue_receiver<stdexec::__id<env_t>, variant_t>>;
631+
stdexec::__t<stream_enqueue_receiver<stdexec::__cvref_id<env_t>, variant_t>>;
634632
using intermediate_receiver =
635-
__if_c<stream_sender<sender_t>, inner_receiver_t, stream_enqueue_receiver_t>;
633+
__if_c<stream_sender<sender_t, env_t>, inner_receiver_t, stream_enqueue_receiver_t>;
636634
using inner_op_state_t = connect_result_t<sender_t, intermediate_receiver>;
637635

638636
friend void tag_invoke(start_t, __t& op) noexcept {
@@ -661,7 +659,7 @@ namespace nvexec {
661659
}
662660

663661
template <__decays_to<outer_receiver_t> OutR, class ReceiverProvider>
664-
requires stream_sender<sender_t>
662+
requires stream_sender<sender_t, env_t>
665663
__t(
666664
sender_t&& sender,
667665
OutR&& out_receiver,

include/nvexec/stream/ensure_started.cuh

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
6262
friend void tag_invoke(Tag, __t&& self, As&&... as) noexcept {
6363
SharedState& state = *self.shared_state_;
6464

65-
if constexpr (stream_sender<Sender>) {
65+
if constexpr (stream_sender<Sender, env_t>) {
6666
cudaStream_t stream = state.stream_provider_.own_stream_.value();
6767
using tuple_t = decayed_tuple<Tag, As...>;
6868
state.index_ = SharedState::variant_t::template index_of<tuple_t>::value;
@@ -109,10 +109,10 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
109109
using inner_receiver_t = stdexec::__t<receiver_t<SenderId, sh_state_t>>;
110110
using task_t = continuation_task_t<inner_receiver_t, variant_t>;
111111
using enqueue_receiver_t =
112-
stdexec::__t<stream_enqueue_receiver<stdexec::__id<env_t>, variant_t>>;
112+
stdexec::__t<stream_enqueue_receiver<stdexec::__cvref_id<env_t>, variant_t>>;
113113
using intermediate_receiver = //
114114
stdexec::__t< std::conditional_t<
115-
stream_sender<Sender>,
115+
stream_sender<Sender, env_t>,
116116
stdexec::__id<inner_receiver_t>,
117117
stdexec::__id<enqueue_receiver_t>>>;
118118
using inner_op_state_t = connect_result_t<Sender, intermediate_receiver>;
@@ -135,7 +135,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
135135
}
136136

137137
explicit sh_state_t(Sender& sndr, context_state_t context_state)
138-
requires(stream_sender<Sender>)
138+
requires(stream_sender<Sender, env_t>)
139139
: context_state_(context_state)
140140
, stream_provider_(false, context_state)
141141
, data_(malloc_managed<variant_t>(stream_provider_.status_))
@@ -170,7 +170,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
170170

171171
~sh_state_t() {
172172
if (stream_provider_.status_ == cudaSuccess) {
173-
if constexpr (stream_sender<Sender>) {
173+
if constexpr (stream_sender<Sender, env_t>) {
174174
STDEXEC_DBG_ERR(cudaEventDestroy(event_));
175175
}
176176
}
@@ -244,7 +244,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
244244
cudaError_t& status = op->shared_state_->stream_provider_.status_;
245245

246246
if (status == cudaSuccess) {
247-
if constexpr (stream_sender<Sender>) {
247+
if constexpr (stream_sender<Sender, env_t>) {
248248
status = STDEXEC_DBG_ERR(
249249
cudaStreamWaitEvent(op->get_stream(), op->shared_state_->event_));
250250
}

include/nvexec/stream/schedule_from.cuh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,10 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
223223
};
224224
};
225225
}
226+
227+
namespace stdexec::__detail {
228+
template <class _Scheduler, class _SenderId>
229+
extern __mconst<nvexec::STDEXEC_STREAM_DETAIL_NS::
230+
schedule_from_sender_t<_Scheduler, __name_of<__t<_SenderId>> > >
231+
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::schedule_from_sender_t<_Scheduler, _SenderId>>;
232+
}

include/nvexec/stream/split.cuh

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
6060
friend void tag_invoke(Tag, __t&& self, As&&... as) noexcept {
6161
SharedState& state = self.sh_state_;
6262

63-
if constexpr (stream_sender<Sender>) {
63+
if constexpr (stream_sender<Sender, env_t>) {
6464
cudaStream_t stream = state.op_state2_.get_stream();
6565
using tuple_t = decayed_tuple<Tag, As...>;
6666
state.index_ = SharedState::variant_t::template index_of<tuple_t>::value;
@@ -111,10 +111,10 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
111111
using inner_receiver_t = stdexec::__t<receiver_t<stdexec::__id<Sender>, sh_state_t>>;
112112
using task_t = continuation_task_t<inner_receiver_t, variant_t>;
113113
using enqueue_receiver_t =
114-
stdexec::__t<stream_enqueue_receiver<stdexec::__id<env_t>, variant_t>>;
114+
stdexec::__t<stream_enqueue_receiver<stdexec::__cvref_id<env_t>, variant_t>>;
115115
using intermediate_receiver = //
116116
stdexec::__t< std::conditional_t<
117-
stream_sender<Sender>,
117+
stream_sender<Sender, env_t>,
118118
stdexec::__id<inner_receiver_t>,
119119
stdexec::__id<enqueue_receiver_t>>>;
120120
using inner_op_state_t = connect_result_t<Sender, intermediate_receiver>;
@@ -133,7 +133,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
133133
::cuda::std::atomic_flag started_{};
134134

135135
explicit sh_state_t(Sender& sndr, context_state_t context_state)
136-
requires(stream_sender<Sender>)
136+
requires(stream_sender<Sender, env_t>)
137137
: context_state_(context_state)
138138
, stream_provider_(false, context_state)
139139
, data_(malloc_managed<variant_t>(stream_provider_.status_))
@@ -171,7 +171,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
171171

172172
if (data_) {
173173
STDEXEC_DBG_ERR(cudaFree(data_));
174-
if constexpr (stream_sender<Sender>) {
174+
if constexpr (stream_sender<Sender, env_t>) {
175175
STDEXEC_DBG_ERR(cudaEventDestroy(event_));
176176
}
177177
}
@@ -234,7 +234,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
234234

235235
cudaError_t& status = op->shared_state_->stream_provider_.status_;
236236
if (status == cudaSuccess) {
237-
if constexpr (stream_sender<Sender>) {
237+
if constexpr (stream_sender<Sender, env_t>) {
238238
status = STDEXEC_DBG_ERR(
239239
cudaStreamWaitEvent(op->get_stream(), op->shared_state_->event_));
240240
}

include/nvexec/stream/transfer.cuh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
5757
::cuda::std::atomic_flag started_{};
5858

5959
using enqueue_receiver =
60-
stdexec::__t<stream_enqueue_receiver<stdexec::__id<Env>, variant_t>>;
60+
stdexec::__t<stream_enqueue_receiver<stdexec::__cvref_id<Env>, variant_t>>;
6161
using inner_op_state_t = connect_result_t<Sender, enqueue_receiver>;
6262
host_ptr<__decay_t<Env>> env_{};
6363
inner_op_state_t inner_op_;

include/nvexec/stream_context.cuh

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,7 @@ namespace nvexec {
298298
transfer_when_all_with_variant_t, //
299299
const stream_scheduler& sch, //
300300
Senders&&... sndrs) noexcept {
301-
return transfer_when_all_sender_th<
302-
stream_scheduler,
303-
tag_invoke_result_t<into_variant_t, Senders>...>(
301+
return transfer_when_all_sender_th< stream_scheduler, __result_of<into_variant, Senders>...>(
304302
sch.context_state_, into_variant((Senders&&) sndrs)...);
305303
}
306304

@@ -354,9 +352,9 @@ namespace nvexec {
354352
}
355353

356354
template <stream_completing_sender... Senders>
357-
when_all_sender_th< stream_scheduler, tag_invoke_result_t<into_variant_t, Senders>...>
355+
when_all_sender_th< stream_scheduler, __result_of<into_variant, Senders>...>
358356
tag_invoke(when_all_with_variant_t, Senders&&... sndrs) noexcept {
359-
return when_all_sender_th< stream_scheduler, tag_invoke_result_t<into_variant_t, Senders>...>{
357+
return when_all_sender_th< stream_scheduler, __result_of<into_variant, Senders>...>{
360358
context_state_t{nullptr, nullptr, nullptr, nullptr},
361359
into_variant((Senders&&) sndrs)...
362360
};

0 commit comments

Comments
 (0)