Skip to content

Commit f0dc533

Browse files
committed
Merge branch 'on-redux' into feature/cuda_reduce
2 parents 5fb2849 + a5b3861 commit f0dc533

File tree

10 files changed

+399
-251
lines changed

10 files changed

+399
-251
lines changed

include/exec/async_scope.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ namespace exec {
222222
return __nest_operation_t<_Receiver>{
223223
__self.__scope_, ((_Self&&) __self).__c_, (_Receiver&&) __rcvr};
224224
}
225+
225226
template <__decays_to<__nest_sender> _Self, class _Env>
226227
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
227228
-> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>, __env_t<_Env>> {
@@ -597,10 +598,17 @@ namespace exec {
597598

598599
////////////////////////////////////////////////////////////////////////////
599600
// async_scope::spawn implementation
601+
template <class _Env>
602+
using __spawn_env_t = __result_of<
603+
__join_env,
604+
_Env,
605+
__env::__prop<get_stop_token_t, in_place_stop_token>,
606+
__env::__prop<get_scheduler_t, __inln::__scheduler>>;
607+
600608
template <class _EnvId>
601609
struct __spawn_op_base {
602610
using _Env = __t<_EnvId>;
603-
__env_t<_Env> __env_;
611+
__spawn_env_t<_Env> __env_;
604612
void (*__delete_)(__spawn_op_base*);
605613
};
606614

@@ -623,7 +631,7 @@ namespace exec {
623631
std::terminate();
624632
}
625633

626-
friend const __env_t<_Env>& tag_invoke(get_env_t, const __spawn_rcvr& __self) noexcept {
634+
friend const __spawn_env_t<_Env>& tag_invoke(get_env_t, const __spawn_rcvr& __self) noexcept {
627635
return __self.__op_->__env_;
628636
}
629637
};
@@ -638,8 +646,9 @@ namespace exec {
638646

639647
template <__decays_to<_Sender> _Sndr>
640648
__spawn_op(_Sndr&& __sndr, _Env __env, const __impl* __scope)
641-
: __spawn_op_base<_EnvId>{make_env((_Env&&) __env,
642-
with(get_stop_token, __scope->__stop_source_.get_token())),
649+
: __spawn_op_base<_EnvId>{__join_env((_Env&&) __env,
650+
__mkprop(get_stop_token, __scope->__stop_source_.get_token()),
651+
__mkprop(get_scheduler, __inln::__scheduler{})),
643652
[](__spawn_op_base<_EnvId>* __op) {
644653
delete static_cast<__spawn_op*>(__op);
645654
}}
@@ -682,7 +691,7 @@ namespace exec {
682691
return nest_result_t<_Constrained>{&__impl_, (_Constrained&&) __c};
683692
}
684693

685-
template <__movable_value _Env = empty_env, sender_in<__env_t<_Env>> _Sender>
694+
template <__movable_value _Env = empty_env, sender_in<__spawn_env_t<_Env>> _Sender>
686695
requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
687696
void spawn(_Sender&& __sndr, _Env __env = {}) {
688697
using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;

include/exec/env.hpp

Lines changed: 1 addition & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -115,116 +115,7 @@ namespace exec {
115115

116116
inline constexpr __read_with_default::__read_with_default_t read_with_default{};
117117

118-
namespace __write {
119-
using namespace stdexec;
120-
121-
struct __write_t;
122-
123-
template <class _ReceiverId, class _Env>
124-
struct __operation_base {
125-
using _Receiver = __t<_ReceiverId>;
126-
_Receiver __rcvr_;
127-
const _Env __env_;
128-
};
129-
130-
template <class _ReceiverId, class _Env>
131-
struct __receiver {
132-
using _Receiver = stdexec::__t<_ReceiverId>;
133-
134-
struct __t : receiver_adaptor<__t> {
135-
_Receiver&& base() && noexcept {
136-
return (_Receiver&&) __op_->__rcvr_;
137-
}
138-
139-
const _Receiver& base() const & noexcept {
140-
return __op_->__rcvr_;
141-
}
142-
143-
auto get_env() const noexcept -> __env::__env_join_t<const _Env&, env_of_t<_Receiver>> {
144-
return __join_env(__op_->__env_, stdexec::get_env(base()));
145-
}
146-
147-
__operation_base<_ReceiverId, _Env>* __op_;
148-
};
149-
};
150-
151-
template <class _SenderId, class _ReceiverId, class _Env>
152-
struct __operation : __operation_base<_ReceiverId, _Env> {
153-
using _Sender = __t<_SenderId>;
154-
using __base_t = __operation_base<_ReceiverId, _Env>;
155-
using __receiver_t = __t<__receiver<_ReceiverId, _Env>>;
156-
connect_result_t<_Sender, __receiver_t> __state_;
157-
158-
__operation(_Sender&& __sndr, auto&& __rcvr, auto&& __env)
159-
: __base_t{(decltype(__rcvr)) __rcvr, (decltype(__env)) __env}
160-
, __state_{stdexec::connect((_Sender&&) __sndr, __receiver_t{{}, this})} {
161-
}
162-
163-
friend void tag_invoke(start_t, __operation& __self) noexcept {
164-
start(__self.__state_);
165-
}
166-
};
167-
168-
template <class _SenderId, class _Env>
169-
struct __sender {
170-
using _Sender = stdexec::__t<_SenderId>;
171-
172-
template <class _Receiver>
173-
using __receiver_t = stdexec::__t<__receiver<__id<_Receiver>, _Env>>;
174-
template <class _Self, class _Receiver>
175-
using __operation_t =
176-
__operation<__id<__copy_cvref_t<_Self, _Sender>>, __id<_Receiver>, _Env>;
177-
178-
struct __t {
179-
using is_sender = void;
180-
using __id = __sender;
181-
_Sender __sndr_;
182-
_Env __env_;
183-
184-
template <__decays_to<__t> _Self, receiver _Receiver>
185-
requires sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Receiver>>
186-
friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
187-
-> __operation_t<_Self, _Receiver> {
188-
return {((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr, ((_Self&&) __self).__env_};
189-
}
190-
191-
friend auto tag_invoke(stdexec::get_env_t, const __t& __self) //
192-
noexcept(stdexec::__nothrow_callable<stdexec::get_env_t, const _Sender&>)
193-
-> stdexec::env_of_t<const _Sender&> {
194-
return stdexec::get_env(__self.__sndr_);
195-
}
196-
197-
template <__decays_to<__t> _Self, class _BaseEnv>
198-
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _BaseEnv&&)
199-
-> stdexec::__completion_signatures_of_t<
200-
__copy_cvref_t<_Self, _Sender>,
201-
__env::__env_join_t<_Env, _BaseEnv>> {
202-
return {};
203-
}
204-
};
205-
};
206-
207-
struct __write_t {
208-
template <class _Sender, class... _Funs>
209-
using __sender_t =
210-
__t<__sender<__id<__decay_t<_Sender>>, __env::__env_join_t<__env::__env_fn<_Funs>...>>>;
211-
212-
template <__is_not_instance_of<__env::__env_fn> _Sender, class... _Funs>
213-
requires sender<_Sender>
214-
auto operator()(_Sender&& __sndr, __env::__env_fn<_Funs>... __withs) const
215-
-> __sender_t<_Sender, _Funs...> {
216-
return {(_Sender&&) __sndr, __join_env(std::move(__withs)...)};
217-
}
218-
219-
template <class... _Funs>
220-
auto operator()(__env::__env_fn<_Funs>... __withs) const
221-
-> __binder_back<__write_t, __env::__env_fn<_Funs>...> {
222-
return {{}, {}, {std::move(__withs)...}};
223-
}
224-
};
225-
} // namespace __write
226-
227-
inline constexpr __write::__write_t write{};
118+
inline constexpr stdexec::__write_::__write_t write{};
228119
} // namespace exec
229120

230121
#ifdef __EDG__

include/exec/static_thread_pool.hpp

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ namespace exec {
9393
}
9494

9595
// For eager customization
96-
template <stdexec::__lazy_sender_for<stdexec::bulk_t> Sender>
96+
template <stdexec::sender_expr_for<stdexec::bulk_t> Sender>
9797
auto transform_sender(Sender&& sndr) const noexcept {
9898
auto sched = stdexec::get_completion_scheduler<stdexec::set_value_t>(
9999
stdexec::get_env(sndr));
@@ -106,7 +106,7 @@ namespace exec {
106106
}
107107

108108
// transform the generic bulk sender into a parallel thread-pool bulk sender
109-
template <stdexec::__lazy_sender_for<stdexec::bulk_t> Sender, class Env>
109+
template <stdexec::sender_expr_for<stdexec::bulk_t> Sender, class Env>
110110
requires stdexec::__callable<stdexec::get_scheduler_t, Env>
111111
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
112112
auto sched = stdexec::get_scheduler(env);
@@ -182,17 +182,6 @@ namespace exec {
182182
return s.make_sender_();
183183
}
184184

185-
// template <class S, class Shape, class Fn>
186-
// bulk_sender_t<S, Shape, Fn> make_bulk_sender_(S&& sndr, Shape shape, Fn fun) const {
187-
// return bulk_sender_t<S, Shape, Fn>{*pool_, (S&&) sndr, shape, (Fn&&) fun};
188-
// }
189-
190-
// template <stdexec::sender S, std::integral Shape, class Fn>
191-
// friend bulk_sender_t<S, Shape, Fn>
192-
// tag_invoke(stdexec::bulk_t, scheduler sch, S&& sndr, Shape shape, Fn fun) noexcept {
193-
// return sch.make_bulk_sender_((S&&) sndr, shape, (Fn&&) fun);
194-
// }
195-
196185
friend stdexec::forward_progress_guarantee
197186
tag_invoke(stdexec::get_forward_progress_guarantee_t, const static_thread_pool&) noexcept {
198187
return stdexec::forward_progress_guarantee::parallel;

include/nvexec/stream/reduce.cuh

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,17 @@ namespace nvexec {
188188
// #if STDEXEC_FRIENDSHIP_IS_LEXICAL()
189189
// private:
190190
// template <class...>
191-
// friend struct stdexec::__basic_sender;
191+
// friend struct stdexec::__sexpr;
192192
// #endif
193193

194194
template < sender Sender, __movable_value Init, __movable_value Fun = cub::Sum>
195195
auto operator()(Sender&& sndr, Init init, Fun fun) const {
196196
auto __domain = __get_sender_domain(sndr);
197-
return __domain.transform_sender(make_sender<reduce_t>(
197+
return __domain.transform_sender(make_sender_expr<reduce_t>(
198198
reduce_::__data{(Init&&) init, (Fun&&) fun}, (Sender&&) sndr));
199199
}
200200

201-
template <__lazy_sender_for<reduce_t> _Sender>
201+
template <sender_expr_for<reduce_t> _Sender>
202202
static auto get_env(const _Sender&) noexcept {
203203
return empty_env{};
204204
}
@@ -213,7 +213,7 @@ namespace nvexec {
213213
}
214214
};
215215

216-
template <__lazy_sender_for<reduce_t> _Sender, receiver _Receiver>
216+
template <sender_expr_for<reduce_t> _Sender, receiver _Receiver>
217217
//requires SOME CONSTRAINT HERE
218218
static auto connect(_Sender&& __sndr, _Receiver __rcvr) {
219219
return op{}; // return a dummy operation state to see if it compiles
@@ -231,7 +231,7 @@ namespace nvexec {
231231
completion_signatures<set_stopped_t()>,
232232
__mbind_back_q<_set_value_t, _Init, _Fun>>;
233233

234-
template <__lazy_sender_for<reduce_t> _Sender, class _Env>
234+
template <sender_expr_for<reduce_t> _Sender, class _Env>
235235
static auto get_completion_signatures(_Sender&& __sndr, _Env&& env) {
236236
// what's the relationship(if it exists) between the lambdas types and the lambda types in `stream_domain::transform_sender`
237237
// apply_sender?
@@ -264,7 +264,7 @@ namespace nvexec {
264264
_Fun),
265265
tag_invoke_t(reduce_t, _Sender, _Init, _Fun)>;
266266

267-
template <__lazy_sender_for<reduce_t> _Sender, receiver _Receiver>
267+
template <sender_expr_for<reduce_t> _Sender, receiver _Receiver>
268268
static auto connect(_Sender&& __sndr, _Receiver __rcvr) noexcept(
269269
__nothrow_callable< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>>)
270270
-> __call_result_t< apply_sender_t, _Sender, reduce_::__connect_fn<_Receiver>> {

include/nvexec/stream_context.cuh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,19 @@ namespace nvexec {
9999
using __default_domain::transform_sender;
100100

101101
// Lazy algorithm customizations require a recursive tree transformation
102-
template <__lazy_sender Sender, class Env>
102+
template <sender_expr Sender, class Env>
103103
requires _non_stream_sender<Sender> // no need to transform it a second time
104104
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
105105
return stdexec::apply_sender(
106106
(Sender&&) sndr,
107107
[&]<class Tag, class Data, class... Children>(Tag, Data&& data, Children&&... children) {
108-
return make_sender<Tag, stream_domain>(
108+
return make_sender_expr<Tag, stream_domain>(
109109
(Data&&) data, transform_sender((Children&&) children, env)...);
110110
});
111111
}
112112

113113
// reduce senders get a special transformation
114-
template <__lazy_sender_for<reduce_t> Sender, class Env>
114+
template <sender_expr_for<reduce_t> Sender, class Env>
115115
requires _non_stream_sender<Sender> // no need to transform it a second time
116116
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
117117
return stdexec::apply_sender(

0 commit comments

Comments
 (0)