forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresult_loop.hh
346 lines (309 loc) · 12.9 KB
/
result_loop.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <iterator>
#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/coroutine/exception.hh>
#include <type_traits>
#include "utils/result.hh"
namespace utils {
namespace internal {
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
// Return 0 in general case
return 0;
}
template<typename Iterator, typename IteratorCategory>
inline size_t iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
// May require linear scan, but it's better than reallocation
return std::distance(begin, end);
}
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Iterator, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
&& requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<seastar::future<R>>; }
inline seastar::future<R> result_parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
std::vector<seastar::future<R>> futs;
while (begin != end) {
auto f = seastar::futurize_invoke(std::forward<Func>(func), *begin++);
seastar::memory::scoped_critical_alloc_section _;
if (f.available() && !f.failed()) {
auto res = f.get();
if (res) {
continue;
}
f = seastar::make_ready_future<R>(std::move(res));
}
if (futs.empty()) {
using itraits = std::iterator_traits<Iterator>;
auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
futs.reserve(n);
}
futs.push_back(std::move(f));
}
if (futs.empty()) {
return seastar::make_ready_future<R>(bo::success());
}
// Use a coroutine so that the waiting task is allocated only once
return ([] (std::vector<seastar::future<R>> futs) noexcept -> seastar::future<R> {
using error_type = typename R::error_type;
std::variant<std::monostate, error_type, std::exception_ptr> result_state;
while (!futs.empty()) {
// TODO: Avoid try..catching here if seastar coroutines allow that
// Or not? Explicit checks might be slower on the happy path
try {
auto res = co_await std::move(futs.back());
if (!res) {
result_state = std::move(res).assume_error();
}
} catch (...) {
result_state = std::current_exception();
}
futs.pop_back();
}
if (std::holds_alternative<std::monostate>(result_state)) {
co_return bo::success();
} else if (auto* error = std::get_if<error_type>(&result_state)) {
co_return std::move(*error);
} else {
co_return seastar::coroutine::exception(
std::get<std::exception_ptr>(std::move(result_state)));
}
})(std::move(futs));
}
// A version of parallel_for_each which understands results.
// In case of a failure, it returns one of the failed results.
template<typename R, typename Range, typename Func>
requires
ExceptionContainerResult<R>
&& std::is_void_v<typename R::value_type>
inline seastar::future<R> result_parallel_for_each(Range&& range, Func&& func) {
return result_parallel_for_each<R>(std::begin(range), std::end(range), std::forward<Func>(func));
}
namespace internal {
template<typename Reducer, ExceptionContainer ExCont>
struct result_reducer_traits {
using result_type = bo::result<void, ExCont, exception_container_throw_policy>;
static seastar::future<result_type> maybe_call_get(Reducer&& r) {
return seastar::make_ready_future<result_type>(bo::success());
}
};
template<typename Reducer, ExceptionContainer ExCont>
requires requires (Reducer r) {
{ r.get() };
}
struct result_reducer_traits<Reducer, ExCont> {
using original_type = seastar::futurize_t<decltype(std::declval<Reducer>().get())>;
using result_type = bo::result<typename original_type::value_type, ExCont, exception_container_throw_policy>;
static seastar::future<result_type> maybe_call_get(Reducer&& r) {
auto x = r.get();
if constexpr (seastar::Future<decltype(x)>) {
return x.then([] (auto&& v) {
return seastar::make_ready_future<result_type>(bo::success(std::move(v)));
});
} else {
return seastar::make_ready_future<result_type>(bo::success(std::move(x)));
}
}
};
template<typename Reducer, ExceptionContainer ExCont>
struct result_map_reduce_unary_adapter {
private:
// We could in theory just use result<Reducer> here and turn it into
// a failed result on receiving an error, however that would destroy
// the Reducer and some map_reduce usages may assume that the reducer
// is alive until all mapper calls finish (e.g. it may hold a smart pointer
// to some memory used by mappers), therefore it is safer to keep it
// separate from the error and only destroy it when the wrapper
// is destroyed.
Reducer _reducer;
ExCont _excont;
using reducer_traits = result_reducer_traits<Reducer, ExCont>;
public:
result_map_reduce_unary_adapter(Reducer&& reducer)
: _reducer(std::forward<Reducer>(reducer))
{ }
template<ExceptionContainerResult Arg>
seastar::future<> operator()(Arg&& arg) {
if (!_excont && arg) {
return seastar::futurize_invoke(_reducer, std::move(arg).assume_value());
}
if (_excont) {
// We already got an error before, so ignore the new one
return seastar::make_ready_future<>();
}
// `arg` must be a failed result
_excont = std::move(arg).assume_error();
return seastar::make_ready_future<>();
}
seastar::future<typename reducer_traits::result_type> get() {
if (_excont) {
return seastar::make_ready_future<typename reducer_traits::result_type>(bo::failure(std::move(_excont)));
}
return reducer_traits::maybe_call_get(std::move(_reducer));
}
};
}
template<typename Iterator, typename Mapper, typename Reducer>
requires requires (Iterator i, Mapper mapper, Reducer reduce) {
*i++;
{ i != i } -> std::convertible_to<bool>;
{ mapper(*i) } -> ExceptionContainerResultFuture<>;
{ seastar::futurize_invoke(reduce, seastar::futurize_invoke(mapper, *i).get().value()) }
-> std::same_as<seastar::future<>>;
}
inline
auto
result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& reducer) {
using result_type = std::remove_reference_t<decltype(seastar::futurize_invoke(mapper, *begin).get())>;
using exception_container_type = typename result_type::error_type;
using adapter_type = internal::result_map_reduce_unary_adapter<Reducer, exception_container_type>;
return seastar::map_reduce(
std::move(begin),
std::move(end),
std::forward<Mapper>(mapper),
adapter_type(std::forward<Reducer>(reducer)));
}
namespace internal {
template<ExceptionContainerResult Left, ExceptionContainerResult Right, typename Reducer>
requires ResultRebindableTo<Right, Left>
struct result_map_reduce_binary_adapter {
private:
using left_value_type = typename Left::value_type;
using right_value_type = typename Right::value_type;
using return_type = std::invoke_result<Reducer, left_value_type&&, right_value_type&&>;
Reducer _reducer;
public:
result_map_reduce_binary_adapter(Reducer&& reducer)
: _reducer(std::forward<Reducer>(reducer))
{ }
Left operator()(Left&& left, Right&& right) {
if (!left) {
return std::move(left);
}
if (!right) {
return std::move(right).as_failure();
}
return _reducer(std::move(left).assume_value(), std::move(right).assume_value());
}
};
}
template<typename Iterator, typename Mapper, typename Initial, typename Reducer>
requires requires (Iterator i, Mapper mapper, Initial initial, Reducer reduce) {
*i++;
{ i != i } -> std::convertible_to<bool>;
{ mapper(*i) } -> ExceptionContainerResultFuture<>;
{ reduce(std::move(initial), mapper(*i).get().value()) }
-> std::convertible_to<rebind_result<Initial, std::remove_reference_t<decltype(mapper(*i).get())>>>;
}
inline
auto
result_map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reducer reduce)
-> seastar::future<rebind_result<Initial, std::remove_reference_t<decltype(mapper(*begin).get())>>> {
using right_type = std::remove_reference_t<decltype(mapper(*begin).get())>;
using left_type = rebind_result<Initial, right_type>;
return seastar::map_reduce(
std::move(begin),
std::move(end),
std::forward<Mapper>(mapper),
left_type(std::move(initial)),
internal::result_map_reduce_binary_adapter<left_type, right_type, Reducer>(std::move(reduce)));
}
template<typename AsyncAction, typename StopCondition>
requires requires (StopCondition cond, AsyncAction act) {
{ cond() } -> std::same_as<bool>;
{ seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>;
}
inline
auto
result_do_until(StopCondition stop_cond, AsyncAction action) {
// TODO: Constrain the result of act() better
using future_type = seastar::futurize_t<std::invoke_result_t<AsyncAction>>;
using result_type = typename future_type::value_type;
static_assert(std::is_void_v<typename result_type::value_type>,
"The result type of the action must be future<result<void>>");
for (;;) {
try {
if (stop_cond()) {
return seastar::make_ready_future<result_type>(bo::success());
}
} catch (...) {
return seastar::current_exception_as_future<result_type>();
}
auto f = seastar::futurize_invoke(action);
if (f.available() && !seastar::need_preempt()) {
if (f.failed()) {
return f;
}
result_type&& res = f.get();
if (!res) {
return seastar::make_ready_future<result_type>(std::move(res));
}
} else {
return ([] (future_type f, StopCondition stop_cond, AsyncAction action) -> seastar::future<result_type> {
for (;;) {
// No need to manually maybe_yield because co_await does that for us
result_type res = co_await std::move(f);
if (!res) {
co_return res;
}
if (stop_cond()) {
co_return bo::success();
}
f = seastar::futurize_invoke(action);
}
})(std::move(f), std::move(stop_cond), std::move(action));
}
}
}
template<typename AsyncAction>
requires requires (AsyncAction act) {
{ seastar::futurize_invoke(act) } -> ExceptionContainerResultFuture<>;
}
inline
auto result_repeat(AsyncAction&& action) noexcept {
using future_type = seastar::futurize_t<std::invoke_result_t<AsyncAction>>;
using result_type = typename future_type::value_type;
static_assert(std::is_same_v<seastar::stop_iteration, typename result_type::value_type>, "bad AsyncAction signature");
using return_result_type = rebind_result<void, result_type>;
for (;;) {
auto f = seastar::futurize_invoke(action);
if (f.available() && !seastar::need_preempt()) {
if (f.failed()) {
return seastar::make_exception_future<return_result_type>(f.get_exception());
}
result_type&& res = f.get();
if (!res) {
return seastar::make_ready_future<return_result_type>(std::move(res).as_failure());
}
if (res.value() == seastar::stop_iteration::yes) {
return seastar::make_ready_future<return_result_type>(bo::success());
}
} else {
return ([] (future_type f, AsyncAction action) -> seastar::future<return_result_type> {
for (;;) {
// No need to manually maybe_yield because co_await does that for us
auto&& res = co_await std::move(f);
if (!res) {
co_return std::move(res).as_failure();
}
if (res.value() == seastar::stop_iteration::yes) {
co_return bo::success();
}
f = seastar::futurize_invoke(action);
}
})(std::move(f), std::move(action));
}
}
}
}