-
Notifications
You must be signed in to change notification settings - Fork 7
Description
https://www.rfc-editor.org/rfc/rfc9621.html#name-transport-services-api-mode
+-----------------------------------------------------+
| Application |
+-----------------------------------------------------+
|
+-----------------------------------------------------+
| Transport Services API |
+-----------------------------------------------------+
|
+-----------------------------------------------------+
| Transport Services Implementation |
| (Using DNS, UDP, TCP, SCTP, DCCP, TLS, QUIC, etc.) |
+-----------------------------------------------------+
|
+-----------------------------------------------------+
| Network-Layer Interface |
+-----------------------------------------------------+
Feasibility of Network Service Layering
Both events and IO operations are asynchronous and correspond to std::execution::operation_state_comcept. Therefore, any local implementation of rfc9621 should be completed at operation-state, using sender to describe the work. Rfc9621 intentionally layers to simplify network IO processing, specifically interpreting data through services and establishing data migration channels for different services through events. Any data migration only needs to care about how many buffers are available. Perhaps we can abstract the system IO service layer and focus on the start and completion status for any asynchronous operation. Then, any advanced protocol parsing service or advanced IO system service only needs to exchange data with the lowest level system IO service layer.
Taking the lowest level system network IO as an example
Network IO operations only require these. This is an abstract result type
struct io_operation_context_base
{
::SOCKET socket{INVALID_SOCKET};
::WSABUF wsabuf{};
::DWORD bytes_transferred{};
};
The allowed IO operations can be limited and abstracted uniformly through enumeration types
enum class io_type : std::uint8_t
{
IO_UNKNOWN,
IO_ACCEPT,
IO_READ,
IO_WRITE
};
struct operation_callback_base
{
using callback_fun_t = void (*)(iocp_operation_base *self) noexcept;
using callback_error_t = void (*)(iocp_operation_base *self,
std::error_code err_code) noexcept;
callback_fun_t complete{};
callback_error_t complete_error{};
};
struct iocp_operation_base : ::OVERLAPPED
{
explicit constexpr iocp_operation_base(io::io_type type,
operation_callback_base callback,
io_operation_context_base &ctx) noexcept
: OVERLAPPED{}, op_type{type}, callbacks{callback}, context{ctx}
{
assert(Internal == 0);
assert(Offset == 0); // NOLINT
assert(OffsetHigh == 0); // NOLINT
assert(hEvent == nullptr);
}
io_type op_type; // NOLINT
operation_callback_base callbacks; // NOLINT
io_operation_context_base &context; // NOLINT
};
template <typename impl_type>
struct iocp_operation : iocp_operation_base
{
explicit constexpr iocp_operation(io_operation_context_base &context) noexcept
: iocp_operation_base{impl_type::io_type,
operation_callback_base{
.complete = &iocp_operation::invoke,
.complete_error = &iocp_operation::invoke_error},
context}
{
assert(this->context.socket != INVALID_SOCKET);
}
private:
constexpr static void invoke(iocp_operation_base *self) noexcept
{
/**
NOTE: Undefined behavior may be
struct B {};
struct D : B { B b };
D d;
B& br1 = d;
B& br2 = d.b;
static_cast<D&>(br1); // OK
static_cast<D&>(br2); // Undefined behavior
*/
impl_type::complete(static_cast<impl_type *>(self));
}
// NOLINTNEXTLINE
constexpr static void invoke_error(iocp_operation_base *self,
std::error_code code) noexcept
{
impl_type::complete_error(static_cast<impl_type *>(self), code);
}
};
Different network services provide different delivery operations
using os_sockaddr_storage_in_type = ::sockaddr_in6;
static constexpr auto SOCKADDRIN_SIZE = sizeof(os_sockaddr_storage_in_type); // NOLINT
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex#:~:text=%5Bin%5D-,dwLocalAddressLength,-The%20number%20of
static constexpr auto ADDRESS_BUFFER_SIZE = SOCKADDRIN_SIZE + 16; // NOLINT
template <typename T>
concept iocp_context = requires(const T &ctx) {
{ ctx.iocp_handle() } noexcept -> std::convertible_to<::HANDLE>;
{ ctx.listen_socket() } noexcept -> std::convertible_to<::SOCKET>;
{ ctx.pfn_acceptex() } noexcept -> std::convertible_to<::LPFN_ACCEPTEX>;
};
struct iocp_context_base
{
struct endpoint_info
{
using port_type = std::uint_least16_t;
std::string ip_address; // NOLINT
port_type port; // NOLINT
[[nodiscard]] std::string to_string() const noexcept // NOLINT
{
return ip_address + ":" + std::to_string(port);
}
};
struct connection_info
{
endpoint_info local; // NOLINT
endpoint_info remote; // NOLINT
[[nodiscard]] std::string to_string() const noexcept // NOLINT
{
return "Local: " + local.to_string() + ", Remote: " + remote.to_string();
}
};
struct rawconnection
{
::SOCKET socket{INVALID_SOCKET};
connection_info info;
};
static constexpr connection_info parse_connection( // NOLINT
LPFN_GETACCEPTEXSOCKADDRS pfnGetAcceptExSockaddrs, void *buffer) noexcept
{
sockaddr *local_addr = nullptr;
sockaddr *remote_addr = nullptr;
int local_len = 0;
int remote_len = 0;
pfnGetAcceptExSockaddrs(buffer,
0,
ADDRESS_BUFFER_SIZE, ADDRESS_BUFFER_SIZE, &local_addr,
&local_len, &remote_addr, &remote_len);
return {.local = parse_endpoint(local_addr),
.remote = parse_endpoint(remote_addr)};
}
static constexpr std::string ip_to_string(const sockaddr *addr) noexcept // NOLINT
{
char buffer[INET6_ADDRSTRLEN]{}; // NOLINT
if (addr->sa_family == AF_INET)
{
const auto *a = reinterpret_cast<const sockaddr_in *>(addr);
::inet_ntop(AF_INET, &a->sin_addr, buffer, sizeof(buffer));
}
else if (addr->sa_family == AF_INET6)
{
const auto *a = reinterpret_cast<const sockaddr_in6 *>(addr);
::inet_ntop(AF_INET6, &a->sin6_addr, buffer, sizeof(buffer));
}
return buffer;
}
// NOLINTNEXTLINE
static constexpr endpoint_info parse_endpoint(const sockaddr *addr) noexcept
{
if (addr == nullptr)
return {.ip_address = "", .port = 0};
if (addr->sa_family == AF_INET6)
{
const auto *addr6 = reinterpret_cast<const sockaddr_in6 *>(addr);
if (::IN6_IS_ADDR_V4MAPPED(&addr6->sin6_addr))
{
sockaddr_in addr4{};
addr4.sin_family = AF_INET;
addr4.sin_port = addr6->sin6_port;
::memcpy(&addr4.sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
return {.ip_address =
ip_to_string(reinterpret_cast<const sockaddr *>(&addr4)),
.port = ::ntohs(addr4.sin_port)};
}
}
if (addr->sa_family == AF_INET)
{
const auto *a = reinterpret_cast<const sockaddr_in *>(addr);
return {.ip_address = ip_to_string(addr), .port = ::ntohs(a->sin_port)};
}
if (addr->sa_family == AF_INET6)
{
const auto *a = reinterpret_cast<const sockaddr_in6 *>(addr);
return {.ip_address = ip_to_string(addr), .port = ::ntohs(a->sin6_port)};
}
return {.ip_address = "Unknown", .port = 0};
}
static constexpr auto get_acceptex_ptr(::SOCKET listen_socket) // NOLINT
{
::LPFN_ACCEPTEX pfn = nullptr;
::GUID guidAcceptEx = WSAID_ACCEPTEX;
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex#:~:text=lib%20library.%0A%20%20%20%20iResult%20%3D-,WSAIoctl,-(ListenSocket%2C%20SIO_GET_EXTENSION_FUNCTION_POINTER%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%26GuidAcceptEx
if (::DWORD bytes = 0;
SOCKET_ERROR == ::WSAIoctl(listen_socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx, sizeof(guidAcceptEx),
reinterpret_cast<::LPVOID>(&pfn), sizeof(pfn),
&bytes, nullptr, nullptr))
throw std::system_error(::WSAGetLastError(), std::system_category());
return pfn;
}
static constexpr auto get_accpetex_sockaddrs_ptr(::SOCKET listen_socket) // NOLINT
{
LPFN_GETACCEPTEXSOCKADDRS pfnGetAcceptExSockaddrs{};
GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
if (DWORD bytes = 0;
SOCKET_ERROR ==
::WSAIoctl(listen_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs),
&pfnGetAcceptExSockaddrs, sizeof(pfnGetAcceptExSockaddrs),
&bytes, nullptr, nullptr))
throw std::system_error(::WSAGetLastError(), std::system_category());
return pfnGetAcceptExSockaddrs;
}
template <iocp_context Service> // NOLINTNEXTLINE
static constexpr auto post_accept(const Service &service,
iocp_operation_base *op) noexcept
-> std::expected<void, std::error_code>
{
auto &ctx = op->context;
DWORD bytes = 0;
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex#:~:text=be%20NULL.-,Return%20value,-If%20no%20error
if (FALSE == service.pfn_acceptex()(service.listen_socket(), ctx.socket,
ctx.wsabuf.buf, 0, ADDRESS_BUFFER_SIZE,
ADDRESS_BUFFER_SIZE, &bytes,
static_cast<::OVERLAPPED *>(op)))
{
if (const ::DWORD k_error = ::WSAGetLastError();
k_error != ERROR_IO_PENDING)
return std::unexpected(std::error_code(static_cast<int>(k_error),
std::system_category()));
}
return {};
}
static constexpr auto post_read(iocp_operation_base *op) noexcept // NOLINT
-> std::expected<void, std::error_code>
{
auto &ctx = op->context;
DWORD flags = 0;
// https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsarecv#:~:text=for%20nonoverlapped%20sockets).-,Return%20value,-If%20no%20error
if (SOCKET_ERROR == ::WSARecv(ctx.socket, &ctx.wsabuf, 1, nullptr, &flags,
static_cast<::OVERLAPPED *>(op), nullptr))
{
if (const ::DWORD k_error = ::WSAGetLastError();
k_error != ERROR_IO_PENDING)
return std::unexpected(std::error_code(static_cast<int>(k_error),
std::system_category()));
}
return {};
}
static constexpr auto post_write(iocp_operation_base *op) noexcept // NOLINT
-> std::expected<void, std::error_code>
{
auto &ctx = op->context;
// https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsasend#:~:text=for%20nonoverlapped%20sockets.-,Return%20value,-If%20no%20error
if (SOCKET_ERROR == ::WSASend(ctx.socket, &ctx.wsabuf, 1, nullptr, 0,
static_cast<::OVERLAPPED *>(op), nullptr))
{
if (const ::DWORD k_error = ::WSAGetLastError();
k_error != ERROR_IO_PENDING)
return std::unexpected(std::error_code(static_cast<int>(k_error),
std::system_category()));
}
return {};
}
};
Then you can integrate the std:: execution to describe the service
template <class InternetProtocol>
struct base_service : io::windows::iocp_context_base
{
using endpoint_type = InternetProtocol::endpoint;
template <typename Sndr, typename Revr>
struct state : io::windows::iocp_operation<state<Sndr, Revr>>
{
static constexpr auto io_type = Sndr::type; // NOLINT
using operation_state_concept = ::std::execution::operation_state_t;
base_service &service; // NOLINT
Revr rcvr; // NOLINT
io::windows::io_operation_context_base operation_context; // NOLINT
constexpr state(Sndr &sndr, Revr &&rcvr) noexcept // NOLINT
: io::windows::iocp_operation<state<Sndr, Revr>>{operation_context},
service{sndr.service}, rcvr{std::forward<Revr>(rcvr)},
operation_context{sndr.operation_context}
{
}
~state() noexcept = default;
state(state &&) = delete;
state(const state &) = delete;
state &operator=(state &&) = delete;
state &operator=(const state &) = delete;
void start() & noexcept
{
if constexpr (io_type == io::io_type::IO_ACCEPT)
{
if (auto ret = iocp_context_base::post_accept(service, this); !ret)
::std::execution::recv::set_error(std::move(rcvr), ret.error());
}
else if constexpr (io_type == io::io_type::IO_READ)
{
if (auto ret = iocp_context_base::post_read(this); !ret)
::std::execution::recv::set_error(std::move(rcvr), ret.error());
}
else if constexpr (io_type == io::io_type::IO_WRITE)
{
if (auto ret = iocp_context_base::post_write(this); !ret)
::std::execution::recv::set_error(std::move(rcvr), ret.error());
}
else
{
std::println("check operation_type. io::io_type is IO_UNKNOWN");
std::terminate();
}
}
constexpr static void complete(state *self) noexcept
{
::std::execution::recv::set_value(std::move(self->rcvr),
std::move(self->context));
}
// NOLINTNEXTLINE
constexpr static void complete_error(state *self,
std::error_code code) noexcept
{
::std::execution::recv::set_error(std::move(self->rcvr), std::move(code));
}
};
template <io::io_type io_type>
struct sender
{
using sender_concept = ::std::execution::sender_t;
using completion_signatures = ::std::execution::completion_signatures<
::std::execution::set_value_t(
::std::net::io::windows::io_operation_context_base),
::std::execution::set_error_t(std::error_code)>;
using indices_for = std::index_sequence_for<>;
static constexpr auto type = io_type; // NOLINT
template <typename Recv>
constexpr auto connect(Recv recv) noexcept -> state<sender, Recv>
{
return {*this, std::move(recv)};
}
base_service &service; // NOLINT
io::windows::io_operation_context_base operation_context;
};
static_assert(::std::execution::sender<sender<io::io_type::IO_ACCEPT>>);
[[nodiscard]] auto iocp_handle() const noexcept // NOLINT
{
return iocp_;
}
[[nodiscard]] auto listen_socket() const noexcept // NOLINT
{
return listenSocket_;
}
[[nodiscard]] auto pfn_acceptex() const noexcept // NOLINT
{
return pfnAcceptEx_;
}
[[nodiscard]] constexpr auto make_accept( // NOLINT
io::windows::io_operation_context_base context) noexcept
{
return sender<io::io_type::IO_ACCEPT>{*this, std::move(context)};
}
[[nodiscard]] constexpr auto make_read( // NOLINT
io::windows::io_operation_context_base context) noexcept
{
return sender<io::io_type::IO_READ>{*this, std::move(context)};
}
[[nodiscard]] constexpr auto make_write( // NOLINT
io::windows::io_operation_context_base context) noexcept
{
return sender<io::io_type::IO_WRITE>{*this, std::move(context)};
}
[[nodiscard]] constexpr auto make_rawconnection( // NOLINT
io::windows::io_operation_context_base context) noexcept
{
return make_accept(context) |
::std::execution::then([this](auto new_ctx) noexcept {
return rawconnection{new_ctx.socket, connection_info(new_ctx)};
});
}
[[nodiscard]] constexpr auto make_raw_socket() const noexcept // NOLINT
{
auto ret = ::WSASocketW(
endpoint_.protocol().family(), endpoint_.protocol().type(),
endpoint_.protocol().protocol(), nullptr, 0, WSA_FLAG_OVERLAPPED);
assert(ret != INVALID_SOCKET);
return ret;
}
[[nodiscard]] constexpr auto family() const noexcept
{
return endpoint_.protocol().family();
}
[[nodiscard]] constexpr auto connection_info(
const io::windows::io_operation_context_base &context) const noexcept
{
return iocp_context_base::parse_connection(pfnGetAcceptExSockaddrs_,
context.wsabuf.buf);
}
constexpr explicit base_service(endpoint_type endpoint) : endpoint_{endpoint}
{
//
if (WSADATA wsaData; 0 != ::WSAStartup(MAKEWORD(2, 2), &wsaData))
throw std::system_error(::WSAGetLastError(), std::system_category());
try
{
iocp_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, {}, {});
if (iocp_ == nullptr || iocp_ == INVALID_HANDLE_VALUE)
throw std::system_error(static_cast<int>(::GetLastError()),
std::system_category());
listenSocket_ = make_raw_socket();
if (listenSocket_ == INVALID_SOCKET)
throw std::system_error(::WSAGetLastError(), std::system_category());
sockaddr_storage addr = {};
int addrLen = 0;
if (endpoint.protocol().family() == AF_INET)
{
auto &ipv4_addr = reinterpret_cast<sockaddr_in &>(addr);
ipv4_addr.sin_family = AF_INET;
ipv4_addr.sin_addr.s_addr = ::htonl(INADDR_ANY);
ipv4_addr.sin_port = ::htons(endpoint.port());
addrLen = sizeof(sockaddr_in);
}
else if (endpoint.protocol().family() == AF_INET6)
{
auto &ipv6_addr = reinterpret_cast<sockaddr_in6 &>(addr);
ipv6_addr.sin6_family = AF_INET6;
ipv6_addr.sin6_addr = in6addr_any; // IPv6
ipv6_addr.sin6_port = ::htons(endpoint.port());
addrLen = sizeof(sockaddr_in6);
}
else
{
throw std::runtime_error("Unsupported address family");
}
if (::bind(listenSocket_, reinterpret_cast<sockaddr *>(&addr), addrLen) ==
SOCKET_ERROR)
{
throw std::system_error(::WSAGetLastError(), std::system_category());
}
if (::listen(listenSocket_, SOMAXCONN) == SOCKET_ERROR)
throw std::system_error(::WSAGetLastError(), std::system_category());
pfnAcceptEx_ = iocp_context_base::get_acceptex_ptr(listenSocket_);
pfnGetAcceptExSockaddrs_ =
iocp_context_base::get_accpetex_sockaddrs_ptr(listenSocket_);
if (nullptr ==
::CreateIoCompletionPort(reinterpret_cast<HANDLE>(listenSocket_),
iocp_, {}, {}))
throw std::system_error(static_cast<int>(::GetLastError()),
std::system_category());
std::println("service start ok, [port: {}]", endpoint.port());
}
catch (...)
{
std::println("********base_service Construction [failed]********");
deinit();
std::rethrow_exception(std::current_exception());
}
}
void run() noexcept
{
std::println("run service port: [{}] .....", endpoint_.port());
while (true)
{
DWORD bytes_transferred = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = nullptr;
DWORD last_error = (TRUE == ::GetQueuedCompletionStatus(
iocp_, &bytes_transferred,
&completion_key, &overlapped, INFINITE))
? ERROR_SUCCESS
: ::GetLastError();
if (finish_.load(std::memory_order_relaxed))
{
std::println("service shutdown....");
break;
}
if (overlapped == nullptr)
{
std::println(
"overlapped == nullptr and GetQueuedCompletionStatus error: {}",
last_error);
continue;
}
auto *op = static_cast<io::windows::iocp_operation_base *>(overlapped);
if (last_error != ERROR_SUCCESS)
op->callbacks.complete_error(
op, std::error_code(static_cast<int>(::GetLastError()),
std::system_category()));
switch (op->op_type)
{
case io::io_type::IO_ACCEPT: {
::SOCKET client_socket = op->context.socket;
// UPDATE_ACCEPT_CONTEXT
::setsockopt(client_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
reinterpret_cast<char *>(&listenSocket_),
sizeof(listenSocket_));
if (nullptr ==
::CreateIoCompletionPort(reinterpret_cast<HANDLE>(client_socket),
iocp_, 0, 0))
{
op->callbacks.complete_error(
op, std::error_code(static_cast<int>(::GetLastError()),
std::system_category()));
continue;
}
op->context.bytes_transferred = bytes_transferred;
op->callbacks.complete(op);
}
break;
case io::io_type::IO_READ:
case io::io_type::IO_WRITE:
op->context.bytes_transferred = bytes_transferred;
op->callbacks.complete(op);
break;
default:
std::println("io::io_type: [ {} ] can not handle",
static_cast<int>(op->op_type));
std::terminate();
}
}
}
constexpr void shutdown() noexcept
{
this->finish_.store(true, std::memory_order::memory_order_release);
::PostQueuedCompletionStatus(iocp_, 0, 0, nullptr);
}
[[nodiscard]] constexpr bool is_stopped() const noexcept // NOLINT
{
return finish_.load(std::memory_order_relaxed);
}
constexpr ~base_service() noexcept
{
deinit();
}
base_service(const base_service &) = delete;
base_service(base_service &&) = delete;
base_service &operator=(const base_service &) = delete;
base_service &operator=(base_service &&) = delete;
private:
endpoint_type endpoint_;
HANDLE iocp_{INVALID_HANDLE_VALUE};
SOCKET listenSocket_{INVALID_SOCKET};
LPFN_ACCEPTEX pfnAcceptEx_{};
LPFN_GETACCEPTEXSOCKADDRS pfnGetAcceptExSockaddrs_{};
std::atomic<bool> finish_;
constexpr void deinit() noexcept
{
if (listenSocket_ != INVALID_SOCKET)
{
::closesocket(listenSocket_);
listenSocket_ = INVALID_SOCKET;
}
if (iocp_ != INVALID_HANDLE_VALUE)
{
::CloseHandle(iocp_);
iocp_ = INVALID_HANDLE_VALUE;
}
::WSACleanup();
}
};
Complete some services using underlying services.Common services can be named and provided by default.DNS, UDP, TCP, SCTP, DCCP, TLS, QUIC, etc.
ip::tcp::endpoint ep(ip::address_v4::any(), 8080);
net::services::windows::base_service<ip::tcp> service(ep);
ex::counting_scope scope;
ex::static_thread_pool<1> pool;
ex::spawn(
ex::starts_on(
pool.scheduler(),
[](auto &service) noexcept -> ex::task<bool> {
int times = 1;
while (times-- > 0)
{
char buffer[2 *io::windows::ADDRESS_BUFFER_SIZE];
io::windows::io_operation_context_base p{
service.make_raw_socket(),
{.len = 2 * io::windows::ADDRESS_BUFFER_SIZE,
.buf = buffer},
{}};
// test copy
auto p2 = p;
assert(p.socket == p2.socket);
assert(p.wsabuf.buf == p2.wsabuf.buf);
assert(p.wsabuf.len == p2.wsabuf.len);
auto connect = co_await service.make_rawconnection(p);
assert(connect.socket == p.socket);
std::println("New connection: {}", connect.info.to_string());
std::println("Remote IP: {}", connect.info.remote.ip_address);
std::println("Remote Port: {}", connect.info.remote.port);
std::println("Local Port: {}", connect.info.local.port);
{
char buffer[4096];
auto ctx = co_await service.make_read(
{connect.socket, {.len = 4096, .buf = buffer}});
std::println("bytes_transferred: {} \n read: {}",
ctx.bytes_transferred,
std::string(buffer, ctx.bytes_transferred));
}
{
std::string responseBody = "Hello World!";
std::string response = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: " +
std::to_string(responseBody.length()) +
"\r\n"
"Connection: " +
(false ? "keep-alive" : "close") +
"\r\n"
"\r\n"
+ responseBody;
auto ctx = co_await service.make_write(
{connect.socket,
{.len = static_cast<::ULONG>(response.size()),
.buf = response.data()}});
std::println("write bytes_transferred: {} , response.size(): {} ",
ctx.bytes_transferred, response.size());
}
}
service.shutdown();
co_return true;
}(service) | ex::then([&](auto ret) noexcept {
std::println("task done: {}", ret);
})),
scope.get_token());
service.run();
std::this_thread::sync_wait(scope.join());
std::cout << "main done\n";