Skip to content

Commit e3ed5f0

Browse files
committed
Add single-buffer optimisation for send and receive.
When we can determine at compile time that the user has supplied a single buffer, use send/recv rather than sendmsg/recvmsg, as the former system calls can be faster.
1 parent bfe960d commit e3ed5f0

6 files changed

+323
-22
lines changed

include/boost/asio/detail/buffer_sequence_adapter.hpp

+14
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class buffer_sequence_adapter
105105
: buffer_sequence_adapter_base
106106
{
107107
public:
108+
enum { is_single_buffer = false };
109+
108110
explicit buffer_sequence_adapter(const Buffers& buffer_sequence)
109111
: count_(0), total_buffer_size_(0)
110112
{
@@ -246,6 +248,8 @@ class buffer_sequence_adapter<Buffer, boost::asio::mutable_buffer>
246248
: buffer_sequence_adapter_base
247249
{
248250
public:
251+
enum { is_single_buffer = true };
252+
249253
explicit buffer_sequence_adapter(
250254
const boost::asio::mutable_buffer& buffer_sequence)
251255
{
@@ -306,6 +310,8 @@ class buffer_sequence_adapter<Buffer, boost::asio::const_buffer>
306310
: buffer_sequence_adapter_base
307311
{
308312
public:
313+
enum { is_single_buffer = true };
314+
309315
explicit buffer_sequence_adapter(
310316
const boost::asio::const_buffer& buffer_sequence)
311317
{
@@ -368,6 +374,8 @@ class buffer_sequence_adapter<Buffer, boost::asio::mutable_buffers_1>
368374
: buffer_sequence_adapter_base
369375
{
370376
public:
377+
enum { is_single_buffer = true };
378+
371379
explicit buffer_sequence_adapter(
372380
const boost::asio::mutable_buffers_1& buffer_sequence)
373381
{
@@ -428,6 +436,8 @@ class buffer_sequence_adapter<Buffer, boost::asio::const_buffers_1>
428436
: buffer_sequence_adapter_base
429437
{
430438
public:
439+
enum { is_single_buffer = true };
440+
431441
explicit buffer_sequence_adapter(
432442
const boost::asio::const_buffers_1& buffer_sequence)
433443
{
@@ -490,6 +500,8 @@ class buffer_sequence_adapter<Buffer, boost::array<Elem, 2> >
490500
: buffer_sequence_adapter_base
491501
{
492502
public:
503+
enum { is_single_buffer = false };
504+
493505
explicit buffer_sequence_adapter(
494506
const boost::array<Elem, 2>& buffer_sequence)
495507
{
@@ -560,6 +572,8 @@ class buffer_sequence_adapter<Buffer, std::array<Elem, 2> >
560572
: buffer_sequence_adapter_base
561573
{
562574
public:
575+
enum { is_single_buffer = false };
576+
563577
explicit buffer_sequence_adapter(
564578
const std::array<Elem, 2>& buffer_sequence)
565579
{

include/boost/asio/detail/impl/socket_ops.ipp

+217
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,38 @@ signed_size_type recv(socket_type s, buf* bufs, size_t count,
788788
#endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
789789
}
790790

791+
signed_size_type recv1(socket_type s, void* data, size_t size,
792+
int flags, boost::system::error_code& ec)
793+
{
794+
clear_last_error();
795+
#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
796+
// Receive some data.
797+
WSABUF buf;
798+
buf.buf = const_cast<char*>(static_cast<const char*>(data));
799+
buf.len = static_cast<ULONG>(size);
800+
DWORD bytes_transferred = 0;
801+
DWORD recv_flags = flags;
802+
int result = error_wrapper(::WSARecv(s, &buf, 1,
803+
&bytes_transferred, &recv_flags, 0, 0), ec);
804+
if (ec.value() == ERROR_NETNAME_DELETED)
805+
ec = boost::asio::error::connection_reset;
806+
else if (ec.value() == ERROR_PORT_UNREACHABLE)
807+
ec = boost::asio::error::connection_refused;
808+
else if (ec.value() == WSAEMSGSIZE || ec.value() == ERROR_MORE_DATA)
809+
result = 0;
810+
if (result != 0)
811+
return socket_error_retval;
812+
ec = boost::system::error_code();
813+
return bytes_transferred;
814+
#else // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
815+
signed_size_type result = error_wrapper(::recv(s,
816+
static_cast<char*>(data), size, flags), ec);
817+
if (result >= 0)
818+
ec = boost::system::error_code();
819+
return result;
820+
#endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
821+
}
822+
791823
size_t sync_recv(socket_type s, state_type state, buf* bufs,
792824
size_t count, int flags, bool all_empty, boost::system::error_code& ec)
793825
{
@@ -833,6 +865,51 @@ size_t sync_recv(socket_type s, state_type state, buf* bufs,
833865
}
834866
}
835867

868+
size_t sync_recv1(socket_type s, state_type state, void* data,
869+
size_t size, int flags, boost::system::error_code& ec)
870+
{
871+
if (s == invalid_socket)
872+
{
873+
ec = boost::asio::error::bad_descriptor;
874+
return 0;
875+
}
876+
877+
// A request to read 0 bytes on a stream is a no-op.
878+
if (size == 0 && (state & stream_oriented))
879+
{
880+
ec = boost::system::error_code();
881+
return 0;
882+
}
883+
884+
// Read some data.
885+
for (;;)
886+
{
887+
// Try to complete the operation without blocking.
888+
signed_size_type bytes = socket_ops::recv1(s, data, size, flags, ec);
889+
890+
// Check if operation succeeded.
891+
if (bytes > 0)
892+
return bytes;
893+
894+
// Check for EOF.
895+
if ((state & stream_oriented) && bytes == 0)
896+
{
897+
ec = boost::asio::error::eof;
898+
return 0;
899+
}
900+
901+
// Operation failed.
902+
if ((state & user_set_non_blocking)
903+
|| (ec != boost::asio::error::would_block
904+
&& ec != boost::asio::error::try_again))
905+
return 0;
906+
907+
// Wait for socket to become ready.
908+
if (socket_ops::poll_read(s, 0, -1, ec) < 0)
909+
return 0;
910+
}
911+
}
912+
836913
#if defined(BOOST_ASIO_HAS_IOCP)
837914

838915
void complete_iocp_recv(state_type state,
@@ -905,6 +982,44 @@ bool non_blocking_recv(socket_type s,
905982
}
906983
}
907984

985+
bool non_blocking_recv1(socket_type s,
986+
void* data, size_t size, int flags, bool is_stream,
987+
boost::system::error_code& ec, size_t& bytes_transferred)
988+
{
989+
for (;;)
990+
{
991+
// Read some data.
992+
signed_size_type bytes = socket_ops::recv1(s, data, size, flags, ec);
993+
994+
// Check for end of stream.
995+
if (is_stream && bytes == 0)
996+
{
997+
ec = boost::asio::error::eof;
998+
return true;
999+
}
1000+
1001+
// Retry operation if interrupted by signal.
1002+
if (ec == boost::asio::error::interrupted)
1003+
continue;
1004+
1005+
// Check if we need to run the operation again.
1006+
if (ec == boost::asio::error::would_block
1007+
|| ec == boost::asio::error::try_again)
1008+
return false;
1009+
1010+
// Operation is complete.
1011+
if (bytes >= 0)
1012+
{
1013+
ec = boost::system::error_code();
1014+
bytes_transferred = bytes;
1015+
}
1016+
else
1017+
bytes_transferred = 0;
1018+
1019+
return true;
1020+
}
1021+
}
1022+
9081023
#endif // defined(BOOST_ASIO_HAS_IOCP)
9091024

9101025
signed_size_type recvfrom(socket_type s, buf* bufs, size_t count,
@@ -1188,6 +1303,39 @@ signed_size_type send(socket_type s, const buf* bufs, size_t count,
11881303
#endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
11891304
}
11901305

1306+
signed_size_type send1(socket_type s, const void* data, size_t size,
1307+
int flags, boost::system::error_code& ec)
1308+
{
1309+
clear_last_error();
1310+
#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
1311+
// Send the data.
1312+
WSABUF buf;
1313+
buf.buf = const_cast<char*>(static_cast<const char*>(data));
1314+
buf.len = static_cast<ULONG>(size);
1315+
DWORD bytes_transferred = 0;
1316+
DWORD send_flags = flags;
1317+
int result = error_wrapper(::WSASend(s, &buf, 1,
1318+
&bytes_transferred, send_flags, 0, 0), ec);
1319+
if (ec.value() == ERROR_NETNAME_DELETED)
1320+
ec = boost::asio::error::connection_reset;
1321+
else if (ec.value() == ERROR_PORT_UNREACHABLE)
1322+
ec = boost::asio::error::connection_refused;
1323+
if (result != 0)
1324+
return socket_error_retval;
1325+
ec = boost::system::error_code();
1326+
return bytes_transferred;
1327+
#else // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
1328+
#if defined(__linux__)
1329+
flags |= MSG_NOSIGNAL;
1330+
#endif // defined(__linux__)
1331+
signed_size_type result = error_wrapper(::send(s,
1332+
static_cast<const char*>(data), size, flags), ec);
1333+
if (result >= 0)
1334+
ec = boost::system::error_code();
1335+
return result;
1336+
#endif // defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
1337+
}
1338+
11911339
size_t sync_send(socket_type s, state_type state, const buf* bufs,
11921340
size_t count, int flags, bool all_empty, boost::system::error_code& ec)
11931341
{
@@ -1226,6 +1374,44 @@ size_t sync_send(socket_type s, state_type state, const buf* bufs,
12261374
}
12271375
}
12281376

1377+
size_t sync_send1(socket_type s, state_type state, const void* data,
1378+
size_t size, int flags, boost::system::error_code& ec)
1379+
{
1380+
if (s == invalid_socket)
1381+
{
1382+
ec = boost::asio::error::bad_descriptor;
1383+
return 0;
1384+
}
1385+
1386+
// A request to write 0 bytes to a stream is a no-op.
1387+
if (size == 0 && (state & stream_oriented))
1388+
{
1389+
ec = boost::system::error_code();
1390+
return 0;
1391+
}
1392+
1393+
// Read some data.
1394+
for (;;)
1395+
{
1396+
// Try to complete the operation without blocking.
1397+
signed_size_type bytes = socket_ops::send1(s, data, size, flags, ec);
1398+
1399+
// Check if operation succeeded.
1400+
if (bytes >= 0)
1401+
return bytes;
1402+
1403+
// Operation failed.
1404+
if ((state & user_set_non_blocking)
1405+
|| (ec != boost::asio::error::would_block
1406+
&& ec != boost::asio::error::try_again))
1407+
return 0;
1408+
1409+
// Wait for socket to become ready.
1410+
if (socket_ops::poll_write(s, 0, -1, ec) < 0)
1411+
return 0;
1412+
}
1413+
}
1414+
12291415
#if defined(BOOST_ASIO_HAS_IOCP)
12301416

12311417
void complete_iocp_send(
@@ -1279,6 +1465,37 @@ bool non_blocking_send(socket_type s,
12791465
}
12801466
}
12811467

1468+
bool non_blocking_send1(socket_type s,
1469+
const void* data, size_t size, int flags,
1470+
boost::system::error_code& ec, size_t& bytes_transferred)
1471+
{
1472+
for (;;)
1473+
{
1474+
// Write some data.
1475+
signed_size_type bytes = socket_ops::send1(s, data, size, flags, ec);
1476+
1477+
// Retry operation if interrupted by signal.
1478+
if (ec == boost::asio::error::interrupted)
1479+
continue;
1480+
1481+
// Check if we need to run the operation again.
1482+
if (ec == boost::asio::error::would_block
1483+
|| ec == boost::asio::error::try_again)
1484+
return false;
1485+
1486+
// Operation is complete.
1487+
if (bytes >= 0)
1488+
{
1489+
ec = boost::system::error_code();
1490+
bytes_transferred = bytes;
1491+
}
1492+
else
1493+
bytes_transferred = 0;
1494+
1495+
return true;
1496+
}
1497+
}
1498+
12821499
#endif // defined(BOOST_ASIO_HAS_IOCP)
12831500

12841501
signed_size_type sendto(socket_type s, const buf* bufs, size_t count,

include/boost/asio/detail/reactive_socket_recv_op.hpp

+19-6
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,26 @@ class reactive_socket_recv_op_base : public reactor_op
4949
reactive_socket_recv_op_base* o(
5050
static_cast<reactive_socket_recv_op_base*>(base));
5151

52-
buffer_sequence_adapter<boost::asio::mutable_buffer,
53-
MutableBufferSequence> bufs(o->buffers_);
52+
typedef buffer_sequence_adapter<boost::asio::mutable_buffer,
53+
MutableBufferSequence> bufs_type;
5454

55-
status result = socket_ops::non_blocking_recv(o->socket_,
56-
bufs.buffers(), bufs.count(), o->flags_,
57-
(o->state_ & socket_ops::stream_oriented) != 0,
58-
o->ec_, o->bytes_transferred_) ? done : not_done;
55+
status result;
56+
if (bufs_type::is_single_buffer)
57+
{
58+
result = socket_ops::non_blocking_recv1(o->socket_,
59+
bufs_type::first(o->buffers_).data(),
60+
bufs_type::first(o->buffers_).size(), o->flags_,
61+
(o->state_ & socket_ops::stream_oriented) != 0,
62+
o->ec_, o->bytes_transferred_) ? done : not_done;
63+
}
64+
else
65+
{
66+
bufs_type bufs(o->buffers_);
67+
result = socket_ops::non_blocking_recv(o->socket_,
68+
bufs.buffers(), bufs.count(), o->flags_,
69+
(o->state_ & socket_ops::stream_oriented) != 0,
70+
o->ec_, o->bytes_transferred_) ? done : not_done;
71+
}
5972

6073
if (result == done)
6174
if ((o->state_ & socket_ops::stream_oriented) != 0)

include/boost/asio/detail/reactive_socket_send_op.hpp

+25-8
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,34 @@ class reactive_socket_send_op_base : public reactor_op
4949
reactive_socket_send_op_base* o(
5050
static_cast<reactive_socket_send_op_base*>(base));
5151

52-
buffer_sequence_adapter<boost::asio::const_buffer,
53-
ConstBufferSequence> bufs(o->buffers_);
52+
typedef buffer_sequence_adapter<boost::asio::const_buffer,
53+
ConstBufferSequence> bufs_type;
5454

55-
status result = socket_ops::non_blocking_send(o->socket_,
56-
bufs.buffers(), bufs.count(), o->flags_,
55+
status result;
56+
if (bufs_type::is_single_buffer)
57+
{
58+
result = socket_ops::non_blocking_send1(o->socket_,
59+
bufs_type::first(o->buffers_).data(),
60+
bufs_type::first(o->buffers_).size(), o->flags_,
5761
o->ec_, o->bytes_transferred_) ? done : not_done;
5862

59-
if (result == done)
60-
if ((o->state_ & socket_ops::stream_oriented) != 0)
61-
if (o->bytes_transferred_ < bufs.total_size())
62-
result = done_and_exhausted;
63+
if (result == done)
64+
if ((o->state_ & socket_ops::stream_oriented) != 0)
65+
if (o->bytes_transferred_ < bufs_type::first(o->buffers_).size())
66+
result = done_and_exhausted;
67+
}
68+
else
69+
{
70+
bufs_type bufs(o->buffers_);
71+
result = socket_ops::non_blocking_send(o->socket_,
72+
bufs.buffers(), bufs.count(), o->flags_,
73+
o->ec_, o->bytes_transferred_) ? done : not_done;
74+
75+
if (result == done)
76+
if ((o->state_ & socket_ops::stream_oriented) != 0)
77+
if (o->bytes_transferred_ < bufs.total_size())
78+
result = done_and_exhausted;
79+
}
6380

6481
BOOST_ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_send",
6582
o->ec_, o->bytes_transferred_));

0 commit comments

Comments
 (0)