-
Notifications
You must be signed in to change notification settings - Fork 0
Example
An example of application use of forque can be found in ./src/app/ directory. It is using dynamic tags with several producers and simple consumers that just outputs consumed items to the console.
The example has rather simple thread pool implementation that provides context for the execution of consumer's and producer's coroutines, but it is out of the scope of this article.
The first thing we do is select tag type and configure runque and forque types:
using tag_type = frq::dtag<>; // dynamic tag
// type returned by 'reserve' call
using reservation_type = frq::reservation<item>;
// type returned by 'get' call
using retainment_type = frq::retainment<item>;
using runque_type = frq::make_runque_t<frq::fifo_order,
frq::coro_thread_model,
retainment_type,
std::allocator<retainment_type>>;
using queue_type = frq::forque<item, runque_type, tag_type>;
generate_tag function gives an example how to generate dynamic tag of arbitrary size:
auto tag_size = tag_size_dist(rng);
while (tag_size-- > 0) {
auto node = frq::make_dtag_node<int>(std::allocator<frq::dtag_node>{},
frq::default_hash_compare{},
tag_value_dist(rng));
tag_values.push_back(std::move(node));
}
return tag_type{begin(tag_values), end(tag_values)};
produce function is coroutine that produces items in the two phases:
auto tag = generate_tag(rng);
// phase 1
auto item = co_await queue.reserve(tag);
// ...
co_await p.yield();
// ...
// phase 2
co_await item.release({tag, value});
In the first phase we reserve the place for the item in queue. In second phase we populate the item with the actual value which will make item available for consumption.
Call to yield actually puts the rest of the coroutine back at the end of the thread pool's queue. Since production is done in a loop without any waiting, this gives consumers opportunity to start consuming items before we end the loop.
Each producer generates certain number of items and after the last producer has finished the production it initiate shutdown of the forque:
if (--producers == 0) {
co_await queue.interrupt();
}
consume function is coroutine that consumes ready items from the forque:
try {
// phase 1
auto item = co_await queue.get();
// ...
co_await p.yield();
// ...
// phase 2
co_await item.finalize();
}
catch (frq::interrupted&) {
break;
}
As with producer, there are two phases. In the first phase we wait and obtain item from the queue and in the second phase, after the processing is done, item is release.
yield call in the middle simulates asynchronous processing by putting rest of coroutine at the end of thread pool's queue.
Processing is wrapped in try/catch block so it can abort consumption coroutione after the shutdown of the forque.