Skip to content

Refactors reader_op to simplify sans-io. #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 53 additions & 50 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,79 +252,82 @@ struct reader_op {

Conn* conn_;
Logger logger_;
std::pair<tribool, std::size_t> res_{std::make_pair(std::make_optional(false), 0)};
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
asio::coroutine coro{};

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
ignore_unused(n);

BOOST_ASIO_CORO_REENTER(coro) for (;;)
{
// Appends some data to the buffer if necessary.
if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) {
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer().next_layer(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
}

logger_.on_read(ec, n);

// The connection is not viable after an error.
if (ec) {
logger_.trace("reader_op (1)", ec);
conn_->cancel(operation::run);
self.complete(ec);
return;
}

// Somebody might have canceled implicitly or explicitly
// while we were suspended and after queueing so we have to
// check.
if (!conn_->is_open()) {
logger_.trace("reader_op (2): connection is closed.");
self.complete(ec);
return;
}
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer().next_layer(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
}

res_ = conn_->mpx_.commit_read(ec);
logger_.on_read(ec, n);

// The connection is not viable after an error.
if (ec) {
logger_.trace("reader_op (3)", ec);
logger_.trace("reader_op (1)", ec);
conn_->cancel(operation::run);
self.complete(ec);
return;
}

if (res_.first.has_value() && res_.first.value()) {
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
BOOST_ASIO_CORO_YIELD
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
}
// The connection might have been canceled while this op was
// suspended or after queueing so we have to check.
if (!conn_->is_open()) {
logger_.trace("reader_op (2): connection is closed.");
self.complete(ec);
return;
}

while (!conn_->mpx_.get_read_buffer().empty()) {
res_ = conn_->mpx_.consume_next(ec);

if (ec) {
logger_.trace("reader_op (4)", ec);
logger_.trace("reader_op (3)", ec);
conn_->cancel(operation::run);
self.complete(ec);
return;
}

if (!conn_->is_open()) {
logger_.trace("reader_op (5): connection is closed.");
self.complete(asio::error::operation_aborted);
return;
if (!res_.first.has_value()) {
// More data is needed.
break;
}

if (res_.first.value()) {
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
BOOST_ASIO_CORO_YIELD
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
}

if (ec) {
logger_.trace("reader_op (4)", ec);
conn_->cancel(operation::run);
self.complete(ec);
return;
}

if (!conn_->is_open()) {
logger_.trace("reader_op (5): connection is closed.");
self.complete(asio::error::operation_aborted);
return;
}
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions include/boost/redis/detail/multiplexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ struct multiplexer {
// they don't have a response e.g. SUBSCRIBE.
auto commit_write() -> std::size_t;

// If the tribool contains no value more data is needed, otherwise
// if the value is true the message consumed is a push.
[[nodiscard]]
auto commit_read(system::error_code& ec) -> std::pair<tribool, std::size_t>;
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;

auto add(std::shared_ptr<elem> const& ptr) -> void;
auto reset() -> void;
Expand Down Expand Up @@ -150,9 +152,9 @@ struct multiplexer {
}

[[nodiscard]]
auto is_data_needed() const noexcept -> bool
auto get_read_buffer() const noexcept -> std::string const&
{
return std::empty(read_buffer_);
return read_buffer_;
}

// TODO: Change signature to receive an adapter instead of a
Expand Down
2 changes: 1 addition & 1 deletion include/boost/redis/impl/multiplexer.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
}
}

std::pair<tribool, std::size_t> multiplexer::commit_read(system::error_code& ec)
std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec)
{
// We arrive here in two states:
//
Expand Down
10 changes: 5 additions & 5 deletions test/test_low_level_sync_sans_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";

boost::system::error_code ec;
auto const ret = mpx.commit_read(ec);
auto const ret = mpx.consume_next(ec);

BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
Expand All @@ -286,12 +286,12 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
mpx.get_read_buffer() = ">2\r\n+one\r";

boost::system::error_code ec;
auto ret = mpx.commit_read(ec);
auto ret = mpx.consume_next(ec);

BOOST_TEST(!ret.first.has_value());

mpx.get_read_buffer().append("\n+two\r\n");
ret = mpx.commit_read(ec);
ret = mpx.consume_next(ec);

BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
Expand Down Expand Up @@ -381,9 +381,9 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
// Simulates a socket read by putting some data in the read buffer.
mpx.get_read_buffer().append("+one\r\n");

// Informs the multiplexer the read operation is concluded.
// Consumes the next message in the read buffer.
boost::system::error_code ec;
auto const ret = mpx.commit_read(ec);
auto const ret = mpx.consume_next(ec);

// The read operation should have been successfull.
BOOST_TEST(ret.first.has_value());
Expand Down