| 
36 | 36 | #include "stream/when_all.cuh"  | 
37 | 37 | #include "stream/reduce.cuh"  | 
38 | 38 | #include "stream/ensure_started.cuh"  | 
 | 39 | +#include "stream/wrap.cuh"  | 
39 | 40 | 
 
  | 
40 | 41 | #include "stream/common.cuh"  | 
41 | 42 | #include "detail/queue.cuh"  | 
@@ -87,41 +88,108 @@ namespace nvexec {  | 
87 | 88 |     template <sender Sender>  | 
88 | 89 |     using ensure_started_th = __t<ensure_started_sender_t<__id<Sender>>>;  | 
89 | 90 | 
 
  | 
90 |  | -    // needed for subsumption purposes  | 
91 |  | -    template <class Sender, class Env>  | 
92 |  | -    concept _non_stream_sender = //  | 
93 |  | -      !derived_from<__decay_t<Sender>, stream_sender_base>;  | 
94 |  | - | 
95 | 91 |     struct stream_scheduler;  | 
96 | 92 | 
 
  | 
97 |  | -    template <class = stream_scheduler>  | 
 | 93 | +    // template <class = stream_scheduler>  | 
 | 94 | +    // struct stream_domain;  | 
 | 95 | + | 
 | 96 | +    // template <class Tag>  | 
 | 97 | +    // struct _just_t : Tag {  | 
 | 98 | +    //   static __prop<get_domain_t, stream_domain<>> get_env(auto&&) noexcept {  | 
 | 99 | +    //     return __mkprop  | 
 | 100 | +    //   }  | 
 | 101 | +    // }  | 
 | 102 | + | 
 | 103 | +    template <class /*= stream_scheduler*/>  | 
98 | 104 |     struct stream_domain : private __default_domain<context_state_t> {  | 
99 | 105 |       using __default_domain::__default_domain;  | 
100 |  | -      using __default_domain::transform_sender;  | 
 | 106 | +      //using __default_domain::transform_sender;  | 
101 | 107 | 
 
  | 
102 | 108 |       // Lazy algorithm customizations require a recursive tree transformation  | 
103 | 109 |       template <sender_expr Sender, class Env>  | 
104 | 110 |         requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
105 | 111 |       auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
106 |  | -        return stdexec::apply_sender(  | 
 | 112 | +        //print<__name_of<Sender>>();  | 
 | 113 | +        auto result = stdexec::apply_sender(  | 
107 | 114 |           (Sender&&) sndr,  | 
108 | 115 |           [&]<class Tag, class Data, class... Children>(Tag, Data&& data, Children&&... children) {  | 
109 |  | -            return make_sender_expr<Tag, stream_domain>(  | 
110 |  | -              (Data&&) data, transform_sender((Children&&) children, env)...);  | 
 | 116 | +            return //as_stream_sender<Env>(  | 
 | 117 | +              make_sender_expr<Tag, stream_domain>(  | 
 | 118 | +                (Data&&) data,  | 
 | 119 | +                stdexec::transform_sender(*this, (Children&&) children, env)...); //,  | 
 | 120 | +              //base());  | 
111 | 121 |           });  | 
 | 122 | +        //print<__name_of<decltype(result)>>();  | 
 | 123 | +        return result;  | 
112 | 124 |       }  | 
113 | 125 | 
 
  | 
114 |  | -      // reduce senders get a special transformation  | 
115 |  | -      template <sender_expr_for<reduce_t> Sender, class Env>  | 
 | 126 | +      template <sender_expr_for<schedule_from_t> Sender, class Env>  | 
116 | 127 |         requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
117 | 128 |       auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
118 | 129 |         return stdexec::apply_sender(  | 
119 | 130 |           (Sender&&) sndr,  | 
120 | 131 |           [&]<class Tag, class Data, class Child>(Tag, Data&& data, Child&& child) {  | 
121 |  | -            auto [init, fun] = (Data&&) data;  | 
122 |  | -            auto next = transform_sender((Child&&) child, env);  | 
123 |  | -            return reduce_sender_t<decltype(next), decltype(init), decltype(fun)>(  | 
124 |  | -              std::move(next), init, fun);  | 
 | 132 | +            auto sched = get_scheduler(env);  | 
 | 133 | +            auto next = stdexec::transform_sender(*this, (Child&&) child, env);  | 
 | 134 | +            return stdexec::__t<  | 
 | 135 | +              schedule_from_sender_t<stream_scheduler, stdexec::__id<decltype(next)>>>{  | 
 | 136 | +              sched.context_state_, std::move(next)};  | 
 | 137 | +          });  | 
 | 138 | +      }  | 
 | 139 | + | 
 | 140 | +      // // reduce senders get a special transformation  | 
 | 141 | +      // template <sender_expr_for<reduce_t> Sender, class Env>  | 
 | 142 | +      //   requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
 | 143 | +      // auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
 | 144 | +      //   return stdexec::apply_sender(  | 
 | 145 | +      //     (Sender&&) sndr,  | 
 | 146 | +      //     [&]<class Tag, class Data, class Child>(Tag, Data&& data, Child&& child) {  | 
 | 147 | +      //       auto [init, fun] = (Data&&) data;  | 
 | 148 | +      //       auto next = stdexec::transform_sender(*this, (Child&&) child, env);  | 
 | 149 | +      //       return reduce_sender_t<decltype(next), decltype(init), decltype(fun)>(  | 
 | 150 | +      //         std::move(next), init, fun);  | 
 | 151 | +      //     });  | 
 | 152 | +      // }  | 
 | 153 | + | 
 | 154 | +      // transform senders get a special transformation  | 
 | 155 | +      template <sender_expr_for<transfer_t> Sender, class Env>  | 
 | 156 | +        requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
 | 157 | +      auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
 | 158 | +        return stdexec::apply_sender(  | 
 | 159 | +          (Sender&&) sndr, [&]<class Data, class Child>(__ignore, Data&& data, Child&& child) {  | 
 | 160 | +            auto from = get_scheduler(env);  | 
 | 161 | +            auto to = get_completion_scheduler<set_value_t>(data);  | 
 | 162 | +            auto next = stdexec::transform_sender(*this, (Child&&) child, env);  | 
 | 163 | +            auto transfer = __t<transfer_sender_t<decltype(next)>>(  | 
 | 164 | +              from.context_state_, std::move(next));  | 
 | 165 | +            return schedule_from(to, std::move(transfer));  | 
 | 166 | +          });  | 
 | 167 | +      }  | 
 | 168 | + | 
 | 169 | +      // template <sender_expr_for<just_t, just_error_t, just_stopped_t> Sender, class Env>  | 
 | 170 | +      //   requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
 | 171 | +      // auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
 | 172 | +      //   return stdexec::apply_sender(  | 
 | 173 | +      //     (Sender&&) sndr, [&]<class Tag, class Data>(Tag, Data&& data) {  | 
 | 174 | +      //       return make_sender_expr<Tag, stream_domain>(  | 
 | 175 | +      //         (Data&&) data, get_completion_scheduler<Tag>(data));  | 
 | 176 | +      //     });  | 
 | 177 | +      // }  | 
 | 178 | +      // template <sender_expr_for<just_t> Sender, class Env>  | 
 | 179 | +      //   requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
 | 180 | +      // auto transform_sender(Sender&& sndr, const Env&) const noexcept {  | 
 | 181 | +      //   return just_sender<__decay_t<Sender>>{(Sender&&) sndr, base()};  | 
 | 182 | +      // }  | 
 | 183 | + | 
 | 184 | +      template <sender_expr_for<bulk_t> Sender, class Env>  | 
 | 185 | +        requires _non_stream_sender<Sender, Env> // no need to transform it a second time  | 
 | 186 | +      auto transform_sender(Sender&& sndr, const Env& env) const noexcept {  | 
 | 187 | +        return stdexec::apply_sender(  | 
 | 188 | +          (Sender&&) sndr, [&]<class Data, class Child>(__ignore, Data&& data, Child&& child) {  | 
 | 189 | +            auto&& [shape, fun] = (Data&&) data;  | 
 | 190 | +            auto next = stdexec::transform_sender(*this, (Child&&) child, env);  | 
 | 191 | +            return bulk_sender_th<decltype(next), decltype(shape), decltype(fun)>{  | 
 | 192 | +              {}, std::move(next), shape, fun};  | 
125 | 193 |           });  | 
126 | 194 |       }  | 
127 | 195 | 
 
  | 
@@ -338,6 +406,10 @@ namespace nvexec {  | 
338 | 406 |       return {base()};  | 
339 | 407 |     }  | 
340 | 408 | 
 
  | 
 | 409 | +    stream_scheduler context_state_t::make_stream_scheduler() const noexcept {  | 
 | 410 | +      return {*this};  | 
 | 411 | +    }  | 
 | 412 | + | 
341 | 413 |     template <stream_completing_sender Sender>  | 
342 | 414 |     void tag_invoke(start_detached_t, Sender&& sndr) noexcept(false) {  | 
343 | 415 |       _submit::submit_t{}((Sender&&) sndr, _start_detached::detached_receiver_t{});  | 
 | 
0 commit comments