diff --git a/Makefile.am b/Makefile.am
index 8a258d5bc..62c51ba53 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -321,9 +321,12 @@ include_bitcoin_network_config_HEADERS = \
include_bitcoin_network_distributorsdir = ${includedir}/bitcoin/network/distributors
include_bitcoin_network_distributors_HEADERS = \
include/bitcoin/network/distributors/distributor.hpp \
+ include/bitcoin/network/distributors/distributor_bitcoind.hpp \
+ include/bitcoin/network/distributors/distributor_electrum.hpp \
include/bitcoin/network/distributors/distributor_http.hpp \
include/bitcoin/network/distributors/distributor_peer.hpp \
include/bitcoin/network/distributors/distributor_rpc.hpp \
+ include/bitcoin/network/distributors/distributor_stratum_v1.hpp \
include/bitcoin/network/distributors/distributors.hpp
include_bitcoin_network_impl_asyncdir = ${includedir}/bitcoin/network/impl/async
diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj
index b2be88449..b4798f171 100644
--- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj
+++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj
@@ -260,9 +260,12 @@
+
+
+
diff --git a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters
index ecd696558..dde8ebb75 100644
--- a/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters
+++ b/builds/msvc/vs2022/libbitcoin-network/libbitcoin-network.vcxproj.filters
@@ -512,6 +512,12 @@
include\bitcoin\network\distributors
+
+ include\bitcoin\network\distributors
+
+
+ include\bitcoin\network\distributors
+
include\bitcoin\network\distributors
@@ -521,6 +527,9 @@
include\bitcoin\network\distributors
+
+ include\bitcoin\network\distributors
+
include\bitcoin\network\distributors
diff --git a/include/bitcoin/network.hpp b/include/bitcoin/network.hpp
index fc7a0fb30..a043af2a2 100644
--- a/include/bitcoin/network.hpp
+++ b/include/bitcoin/network.hpp
@@ -51,9 +51,12 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
#include
#include
#include
diff --git a/include/bitcoin/network/async/desubscriber.hpp b/include/bitcoin/network/async/desubscriber.hpp
index 4a7e8b33e..33f1ac73f 100644
--- a/include/bitcoin/network/async/desubscriber.hpp
+++ b/include/bitcoin/network/async/desubscriber.hpp
@@ -33,7 +33,8 @@ template
class desubscriber final
{
public:
- DELETE_COPY_MOVE(desubscriber);
+ DELETE_COPY(desubscriber);
+ DEFAULT_MOVE(desubscriber);
using key = Key;
typedef std::function handler;
diff --git a/include/bitcoin/network/async/subscriber.hpp b/include/bitcoin/network/async/subscriber.hpp
index f44ed9187..cecf403a3 100644
--- a/include/bitcoin/network/async/subscriber.hpp
+++ b/include/bitcoin/network/async/subscriber.hpp
@@ -31,7 +31,8 @@ template
class subscriber final
{
public:
- DELETE_COPY_MOVE(subscriber);
+ DELETE_COPY(subscriber);
+ DEFAULT_MOVE(subscriber);
typedef std::function handler;
diff --git a/include/bitcoin/network/async/unsubscriber.hpp b/include/bitcoin/network/async/unsubscriber.hpp
index 504c4f81b..2fdcf8b0b 100644
--- a/include/bitcoin/network/async/unsubscriber.hpp
+++ b/include/bitcoin/network/async/unsubscriber.hpp
@@ -34,7 +34,8 @@ template
class unsubscriber final
{
public:
- DELETE_COPY_MOVE(unsubscriber);
+ DELETE_COPY(unsubscriber);
+ DEFAULT_MOVE(unsubscriber);
typedef std::function handler;
diff --git a/include/bitcoin/network/distributors/distributor.hpp b/include/bitcoin/network/distributors/distributor.hpp
index 3dce08333..ee773fe1a 100644
--- a/include/bitcoin/network/distributors/distributor.hpp
+++ b/include/bitcoin/network/distributors/distributor.hpp
@@ -19,11 +19,69 @@
#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_HPP
#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_HPP
+#include
+#include
#include
+// TODO: move this.
+
namespace libbitcoin {
namespace network {
+namespace rpc {
+
+BC_PUSH_WARNING(NO_UNSAFE_COPY_N)
+BC_PUSH_WARNING(NO_ARRAY_TO_POINTER_DECAY)
+
+enum class group { positional, named, either };
+
+/// Non-type template parameter (NTTP) dynamically defines a name for type.
+template
+struct method_name
+{
+ constexpr method_name(const char (&text)[Length])
+ {
+ std::copy_n(text, Length, name);
+ }
+
+ char name[Length]{};
+ static constexpr auto length = sub1(Length);
+};
+
+template
+struct method
+{
+ static constexpr std::string_view name{ Unique.name, Unique.length };
+
+ using tag = method;
+ using args = std::tuple;
+ using names_t = std::array;
+
+ /// Required for construction of tag{}.
+ constexpr method() NOEXCEPT
+ : names_{}
+ {
+ }
+
+ template = true>
+ constexpr method(ParameterNames&&... names) NOEXCEPT
+ : names_{ std::forward(names)... }
+ {
+ }
+
+ constexpr const names_t& names() const NOEXCEPT
+ {
+ return names_;
+ }
+
+private:
+ const names_t names_;
+};
+
+BC_POP_WARNING()
+BC_POP_WARNING()
+} // namespace rpc
} // namespace network
} // namespace libbitcoin
diff --git a/include/bitcoin/network/distributors/distributor_bitcoind.hpp b/include/bitcoin/network/distributors/distributor_bitcoind.hpp
new file mode 100644
index 000000000..96f8082c4
--- /dev/null
+++ b/include/bitcoin/network/distributors/distributor_bitcoind.hpp
@@ -0,0 +1,44 @@
+/**
+ * Copyright (c) 2011-2025 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_BITCOIND_HPP
+#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_BITCOIND_HPP
+
+#include
+#include
+
+namespace libbitcoin {
+namespace network {
+
+struct bitcoind
+{
+ static constexpr std::tuple methods
+ {
+ rpc::method<"get_version">{},
+ rpc::method<"add_element", int, int>{ "a", "b" },
+ };
+
+ using type = decltype(methods);
+ static constexpr auto size = std::tuple_size_v;
+ static constexpr rpc::group mode = rpc::group::either;
+};
+
+} // namespace network
+} // namespace libbitcoin
+
+#endif
diff --git a/include/bitcoin/network/distributors/distributor_electrum.hpp b/include/bitcoin/network/distributors/distributor_electrum.hpp
new file mode 100644
index 000000000..77c4df6c2
--- /dev/null
+++ b/include/bitcoin/network/distributors/distributor_electrum.hpp
@@ -0,0 +1,30 @@
+/**
+ * Copyright (c) 2011-2025 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_ELECTRUM_HPP
+#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_ELECTRUM_HPP
+
+#include
+
+namespace libbitcoin {
+namespace network {
+
+} // namespace network
+} // namespace libbitcoin
+
+#endif
diff --git a/include/bitcoin/network/distributors/distributor_http.hpp b/include/bitcoin/network/distributors/distributor_http.hpp
index 2bc0ae5a8..dcb408fca 100644
--- a/include/bitcoin/network/distributors/distributor_http.hpp
+++ b/include/bitcoin/network/distributors/distributor_http.hpp
@@ -21,7 +21,6 @@
#include
#include
-#include
#include
#include
@@ -30,13 +29,11 @@ namespace network {
#define SUBSCRIBER(name) name##_subscriber_
#define SUBSCRIBER_TYPE(name) name##_subscriber
-#define DECLARE_SUBSCRIBER(name) SUBSCRIBER_TYPE(name) SUBSCRIBER(name)
-#define HANDLER(name) distributor_http::handler
-#define DEFINE_SUBSCRIBER(name) \
- using SUBSCRIBER_TYPE(name) = subscriber
-#define SUBSCRIBER_OVERLOAD(name) \
- inline code do_subscribe(HANDLER(name)&& handler) NOEXCEPT \
- { return SUBSCRIBER(name).subscribe(std::forward(handler)); }
+#define DECLARE_SUBSCRIBER(name) \
+ using SUBSCRIBER_TYPE(name) = subscriber; \
+ SUBSCRIBER_TYPE(name) SUBSCRIBER(name); \
+ inline code do_subscribe(SUBSCRIBER_TYPE(name)::handler&& handler) NOEXCEPT \
+ { return SUBSCRIBER(name).subscribe(std::move(handler)); }
/// Not thread safe.
class BCT_API distributor_http
@@ -48,16 +45,6 @@ class BCT_API distributor_http
DELETE_COPY_MOVE_DESTRUCT(distributor_http);
- DEFINE_SUBSCRIBER(get);
- DEFINE_SUBSCRIBER(head);
- DEFINE_SUBSCRIBER(post);
- DEFINE_SUBSCRIBER(put);
- DEFINE_SUBSCRIBER(delete_);
- DEFINE_SUBSCRIBER(trace);
- DEFINE_SUBSCRIBER(options);
- DEFINE_SUBSCRIBER(connect);
- DEFINE_SUBSCRIBER(unknown);
-
/// Create an instance of this class.
distributor_http(asio::strand& strand) NOEXCEPT;
@@ -88,34 +75,21 @@ class BCT_API distributor_http
subscriber.notify(ec, method);
}
- SUBSCRIBER_OVERLOAD(get);
- SUBSCRIBER_OVERLOAD(head);
- SUBSCRIBER_OVERLOAD(post);
- SUBSCRIBER_OVERLOAD(put);
- SUBSCRIBER_OVERLOAD(delete_);
- SUBSCRIBER_OVERLOAD(trace);
- SUBSCRIBER_OVERLOAD(options);
- SUBSCRIBER_OVERLOAD(connect);
- SUBSCRIBER_OVERLOAD(unknown);
-
// These are thread safe.
- DECLARE_SUBSCRIBER(get);
- DECLARE_SUBSCRIBER(head);
- DECLARE_SUBSCRIBER(post);
- DECLARE_SUBSCRIBER(put);
- DECLARE_SUBSCRIBER(delete_);
- DECLARE_SUBSCRIBER(trace);
- DECLARE_SUBSCRIBER(options);
- DECLARE_SUBSCRIBER(connect);
- DECLARE_SUBSCRIBER(unknown);
+ DECLARE_SUBSCRIBER(get)
+ DECLARE_SUBSCRIBER(head)
+ DECLARE_SUBSCRIBER(post)
+ DECLARE_SUBSCRIBER(put)
+ DECLARE_SUBSCRIBER(delete_)
+ DECLARE_SUBSCRIBER(trace)
+ DECLARE_SUBSCRIBER(options)
+ DECLARE_SUBSCRIBER(connect)
+ DECLARE_SUBSCRIBER(unknown)
};
#undef SUBSCRIBER
#undef SUBSCRIBER_TYPE
#undef DECLARE_SUBSCRIBER
-#undef HANDLER
-#undef DEFINE_SUBSCRIBER
-#undef SUBSCRIBER_OVERLOAD
} // namespace network
} // namespace libbitcoin
diff --git a/include/bitcoin/network/distributors/distributor_peer.hpp b/include/bitcoin/network/distributors/distributor_peer.hpp
index 1a3e27532..2dcfaafc7 100644
--- a/include/bitcoin/network/distributors/distributor_peer.hpp
+++ b/include/bitcoin/network/distributors/distributor_peer.hpp
@@ -30,13 +30,11 @@ namespace network {
#define SUBSCRIBER(name) name##_subscriber_
#define SUBSCRIBER_TYPE(name) name##_subscriber
-#define DECLARE_SUBSCRIBER(name) SUBSCRIBER_TYPE(name) SUBSCRIBER(name)
-#define DEFINE_SUBSCRIBER(name) using SUBSCRIBER_TYPE(name) = \
- unsubscriber
-#define SUBSCRIBER_OVERLOAD(name) inline code do_subscribe( \
- distributor_peer::handler&& handler) NOEXCEPT \
- { return SUBSCRIBER(name).subscribe(std::forward< \
- distributor_peer::handler>(handler)); }
+#define DECLARE_SUBSCRIBER(name) \
+ using SUBSCRIBER_TYPE(name) = unsubscriber; \
+ SUBSCRIBER_TYPE(name) SUBSCRIBER(name); \
+ inline code do_subscribe(SUBSCRIBER_TYPE(name)::handler&& handler) NOEXCEPT \
+ { return SUBSCRIBER(name).subscribe(std::move(handler)); }
/// Not thread safe.
class BCT_API distributor_peer
@@ -49,42 +47,6 @@ class BCT_API distributor_peer
DELETE_COPY_MOVE_DESTRUCT(distributor_peer);
- DEFINE_SUBSCRIBER(address);
- DEFINE_SUBSCRIBER(alert);
- DEFINE_SUBSCRIBER(block);
- DEFINE_SUBSCRIBER(bloom_filter_add);
- DEFINE_SUBSCRIBER(bloom_filter_clear);
- DEFINE_SUBSCRIBER(bloom_filter_load);
- DEFINE_SUBSCRIBER(client_filter);
- DEFINE_SUBSCRIBER(client_filter_checkpoint);
- DEFINE_SUBSCRIBER(client_filter_headers);
- DEFINE_SUBSCRIBER(compact_block);
- DEFINE_SUBSCRIBER(compact_transactions);
- DEFINE_SUBSCRIBER(fee_filter);
- DEFINE_SUBSCRIBER(get_address);
- DEFINE_SUBSCRIBER(get_blocks);
- DEFINE_SUBSCRIBER(get_client_filter_checkpoint);
- DEFINE_SUBSCRIBER(get_client_filter_headers);
- DEFINE_SUBSCRIBER(get_client_filters);
- DEFINE_SUBSCRIBER(get_compact_transactions);
- DEFINE_SUBSCRIBER(get_data);
- DEFINE_SUBSCRIBER(get_headers);
- DEFINE_SUBSCRIBER(headers);
- DEFINE_SUBSCRIBER(inventory);
- DEFINE_SUBSCRIBER(memory_pool);
- DEFINE_SUBSCRIBER(merkle_block);
- DEFINE_SUBSCRIBER(not_found);
- DEFINE_SUBSCRIBER(ping);
- DEFINE_SUBSCRIBER(pong);
- DEFINE_SUBSCRIBER(reject);
- DEFINE_SUBSCRIBER(send_address_v2);
- DEFINE_SUBSCRIBER(send_compact);
- DEFINE_SUBSCRIBER(send_headers);
- DEFINE_SUBSCRIBER(transaction);
- DEFINE_SUBSCRIBER(version);
- DEFINE_SUBSCRIBER(version_acknowledge);
- DEFINE_SUBSCRIBER(witness_tx_id_relay);
-
/// Create an instance of this class.
distributor_peer(memory& memory, asio::strand& strand) NOEXCEPT;
@@ -127,78 +89,42 @@ class BCT_API distributor_peer
return error::success;
}
- SUBSCRIBER_OVERLOAD(address);
- SUBSCRIBER_OVERLOAD(alert);
- SUBSCRIBER_OVERLOAD(block);
- SUBSCRIBER_OVERLOAD(bloom_filter_add);
- SUBSCRIBER_OVERLOAD(bloom_filter_clear);
- SUBSCRIBER_OVERLOAD(bloom_filter_load);
- SUBSCRIBER_OVERLOAD(client_filter);
- SUBSCRIBER_OVERLOAD(client_filter_checkpoint);
- SUBSCRIBER_OVERLOAD(client_filter_headers);
- SUBSCRIBER_OVERLOAD(compact_block);
- SUBSCRIBER_OVERLOAD(compact_transactions);
- SUBSCRIBER_OVERLOAD(fee_filter);
- SUBSCRIBER_OVERLOAD(get_address);
- SUBSCRIBER_OVERLOAD(get_blocks);
- SUBSCRIBER_OVERLOAD(get_client_filter_checkpoint);
- SUBSCRIBER_OVERLOAD(get_client_filter_headers);
- SUBSCRIBER_OVERLOAD(get_client_filters);
- SUBSCRIBER_OVERLOAD(get_compact_transactions);
- SUBSCRIBER_OVERLOAD(get_data);
- SUBSCRIBER_OVERLOAD(get_headers);
- SUBSCRIBER_OVERLOAD(headers);
- SUBSCRIBER_OVERLOAD(inventory);
- SUBSCRIBER_OVERLOAD(memory_pool);
- SUBSCRIBER_OVERLOAD(merkle_block);
- SUBSCRIBER_OVERLOAD(not_found);
- SUBSCRIBER_OVERLOAD(ping);
- SUBSCRIBER_OVERLOAD(pong);
- SUBSCRIBER_OVERLOAD(reject);
- SUBSCRIBER_OVERLOAD(send_address_v2);
- SUBSCRIBER_OVERLOAD(send_compact);
- SUBSCRIBER_OVERLOAD(send_headers);
- SUBSCRIBER_OVERLOAD(transaction);
- SUBSCRIBER_OVERLOAD(version);
- SUBSCRIBER_OVERLOAD(version_acknowledge);
- SUBSCRIBER_OVERLOAD(witness_tx_id_relay);
-
// These are thread safe.
- DECLARE_SUBSCRIBER(address);
- DECLARE_SUBSCRIBER(alert);
- DECLARE_SUBSCRIBER(block);
- DECLARE_SUBSCRIBER(bloom_filter_add);
- DECLARE_SUBSCRIBER(bloom_filter_clear);
- DECLARE_SUBSCRIBER(bloom_filter_load);
- DECLARE_SUBSCRIBER(client_filter);
- DECLARE_SUBSCRIBER(client_filter_checkpoint);
- DECLARE_SUBSCRIBER(client_filter_headers);
- DECLARE_SUBSCRIBER(compact_block);
- DECLARE_SUBSCRIBER(compact_transactions);
- DECLARE_SUBSCRIBER(fee_filter);
- DECLARE_SUBSCRIBER(get_address);
- DECLARE_SUBSCRIBER(get_blocks);
- DECLARE_SUBSCRIBER(get_client_filter_checkpoint);
- DECLARE_SUBSCRIBER(get_client_filter_headers);
- DECLARE_SUBSCRIBER(get_client_filters);
- DECLARE_SUBSCRIBER(get_compact_transactions);
- DECLARE_SUBSCRIBER(get_data);
- DECLARE_SUBSCRIBER(get_headers);
- DECLARE_SUBSCRIBER(headers);
- DECLARE_SUBSCRIBER(inventory);
- DECLARE_SUBSCRIBER(memory_pool);
- DECLARE_SUBSCRIBER(merkle_block);
- DECLARE_SUBSCRIBER(not_found);
- DECLARE_SUBSCRIBER(ping);
- DECLARE_SUBSCRIBER(pong);
- DECLARE_SUBSCRIBER(reject);
- DECLARE_SUBSCRIBER(send_address_v2);
- DECLARE_SUBSCRIBER(send_compact);
- DECLARE_SUBSCRIBER(send_headers);
- DECLARE_SUBSCRIBER(transaction);
- DECLARE_SUBSCRIBER(version);
- DECLARE_SUBSCRIBER(version_acknowledge);
- DECLARE_SUBSCRIBER(witness_tx_id_relay);
+ DECLARE_SUBSCRIBER(address)
+ DECLARE_SUBSCRIBER(alert)
+ DECLARE_SUBSCRIBER(block)
+ DECLARE_SUBSCRIBER(bloom_filter_add)
+ DECLARE_SUBSCRIBER(bloom_filter_clear)
+ DECLARE_SUBSCRIBER(bloom_filter_load)
+ DECLARE_SUBSCRIBER(client_filter)
+ DECLARE_SUBSCRIBER(client_filter_checkpoint)
+ DECLARE_SUBSCRIBER(client_filter_headers)
+ DECLARE_SUBSCRIBER(compact_block)
+ DECLARE_SUBSCRIBER(compact_transactions)
+ DECLARE_SUBSCRIBER(fee_filter)
+ DECLARE_SUBSCRIBER(get_address)
+ DECLARE_SUBSCRIBER(get_blocks)
+ DECLARE_SUBSCRIBER(get_client_filter_checkpoint)
+ DECLARE_SUBSCRIBER(get_client_filter_headers)
+ DECLARE_SUBSCRIBER(get_client_filters)
+ DECLARE_SUBSCRIBER(get_compact_transactions)
+ DECLARE_SUBSCRIBER(get_data)
+ DECLARE_SUBSCRIBER(get_headers)
+ DECLARE_SUBSCRIBER(headers)
+ DECLARE_SUBSCRIBER(inventory)
+ DECLARE_SUBSCRIBER(memory_pool)
+ DECLARE_SUBSCRIBER(merkle_block)
+ DECLARE_SUBSCRIBER(not_found)
+ DECLARE_SUBSCRIBER(ping)
+ DECLARE_SUBSCRIBER(pong)
+ DECLARE_SUBSCRIBER(reject)
+ DECLARE_SUBSCRIBER(send_address_v2)
+ DECLARE_SUBSCRIBER(send_compact)
+ DECLARE_SUBSCRIBER(send_headers)
+ DECLARE_SUBSCRIBER(transaction)
+ DECLARE_SUBSCRIBER(version)
+ DECLARE_SUBSCRIBER(version_acknowledge)
+ DECLARE_SUBSCRIBER(witness_tx_id_relay)
memory& memory_;
};
@@ -212,8 +138,6 @@ code distributor_peer::do_notify(
#undef SUBSCRIBER
#undef SUBSCRIBER_TYPE
-#undef DEFINE_SUBSCRIBER
-#undef SUBSCRIBER_OVERLOAD
#undef DECLARE_SUBSCRIBER
} // namespace network
diff --git a/include/bitcoin/network/distributors/distributor_rpc.hpp b/include/bitcoin/network/distributors/distributor_rpc.hpp
index ee1d596be..35b65f9df 100644
--- a/include/bitcoin/network/distributors/distributor_rpc.hpp
+++ b/include/bitcoin/network/distributors/distributor_rpc.hpp
@@ -22,69 +22,135 @@
#include
#include
#include
+#include
#include
namespace libbitcoin {
namespace network {
-// Macro for declaring method subscribers.
-#define DECLARE_METHOD(name_, ...) \
-using name_##_handler = std::function; \
-using name_##_subscriber_type = unsubscriber<__VA_ARGS__>; \
-name_##_subscriber_type name_##_subscriber_; \
-static code notify_##name_(distributor_rpc& self, \
- const json::params_option& params); \
-code subscribe_##name_(name_##_handler&& handler) NOEXCEPT \
-{ return name_##_subscriber_.subscribe(std::move(handler)); }
-
/// Not thread safe.
-class BCT_API distributor_rpc
+template
+class distributor_rpc
{
public:
- DELETE_COPY_MOVE_DESTRUCT(distributor_rpc);
+ DELETE_COPY(distributor_rpc);
+ DEFAULT_MOVE(distributor_rpc);
- /// Create an instance of this class.
- distributor_rpc(asio::strand& strand) NOEXCEPT;
+ /// If stopped, handler is invoked with error::subscriber_stopped.
+ /// If key exists, handler is invoked with error::subscriber_exists.
+ /// Otherwise handler retained. Subscription code is also returned here.
+ template
+ inline code subscribe(Handler&& handler) NOEXCEPT;
- /// Stop all unsubscribers with the given code.
- virtual void stop(const code& ec) NOEXCEPT;
+ /// Create an instance of this class.
+ inline distributor_rpc(asio::strand& strand) NOEXCEPT;
+ virtual ~distributor_rpc() = default;
/// Dispatch the request to the appropriate method's unsubscriber.
- virtual code notify(const json::request_t& request) NOEXCEPT;
+ virtual inline code notify(const json::request_t& request) NOEXCEPT;
-protected:
- /// Example methods.
- DECLARE_METHOD(get_version)
- DECLARE_METHOD(add_element, int, int)
+ /// Stop all unsubscribers with the given code.
+ virtual inline void stop(const code& ec) NOEXCEPT;
private:
+ // make_subscribers
+ // ------------------------------------------------------------------------
+
+ template
+ struct subscriber_type;
+
+ template
+ struct subscriber_type>
+ {
+ // The subscriber signature/handler includes the tag.
+ using tag = typename rpc::method::tag;
+ using type = network::unsubscriber;
+ };
+
+ template
+ using subscriber_t = typename subscriber_type::type;
+
+ template
+ struct subscribers_type;
+
+ template
+ struct subscribers_type>
+ {
+ using type = std::tuple...>;
+ };
+
+ using methods_t = std::remove_const_t;
+ using subscribers_t = typename subscribers_type::type;
+
+ template
+ static inline subscribers_t make_subscribers(asio::strand& strand,
+ std::index_sequence) NOEXCEPT;
+
+ subscribers_t subscribers_;
+
+ // make_dispatchers
+ // ------------------------------------------------------------------------
+
template
using names_t = std::array;
using optional_t = json::params_option;
using functor_t = std::function;
using dispatch_t = std::unordered_map;
- enum class container{ positional, named, either };
+ using sequence_t = std::make_index_sequence;
+
+ static inline bool has_params(const optional_t& parameters) NOEXCEPT;
template
- static Type extract(const json::value_t& value) THROWS;
- template
- static std::tuple extractor(const optional_t& parameters,
- container mode, const names_t& names) THROWS;
- template
- static code notifier(auto& subscriber, const optional_t& parameters,
- container mode, auto&& tuple) NOEXCEPT;
- static bool has_params(const optional_t& parameters) NOEXCEPT;
+ static inline Type extract(const json::value_t& value) THROWS;
+
+ template
+ static inline Tuple extractor(const optional_t& parameters,
+ const std::array>& names) THROWS;
+
+ template
+ static inline code notifier(subscriber_t& subscriber,
+ const optional_t& parameters, const typename Method::names_t& names) NOEXCEPT;
+
+ template
+ static inline code do_notify(distributor_rpc& self,
+ const optional_t& params) NOEXCEPT;
+
+ template
+ static inline constexpr dispatch_t make_dispatchers(
+ std::index_sequence) NOEXCEPT;
- // Function map, find name and dispatch with request.
static const dispatch_t dispatch_;
+
+ // subscribe helpers
+ // ------------------------------------------------------------------------
+
+ template
+ struct traits;
+
+ template
+ struct traits>
+ : traits {};
+
+ template
+ struct traits
+ {
+ using tag = Tag;
+ using args = std::tuple;
+ };
+
+ template
+ static inline constexpr size_t find_tag_index() NOEXCEPT;
};
} // namespace network
} // namespace libbitcoin
-#undef DECLARE_METHOD
+#define TEMPLATE template
+#define CLASS distributor_rpc
#include
+#undef CLASS
+#undef TEMPLATE
+
#endif
diff --git a/include/bitcoin/network/distributors/distributor_stratum_v1.hpp b/include/bitcoin/network/distributors/distributor_stratum_v1.hpp
new file mode 100644
index 000000000..b6abd8661
--- /dev/null
+++ b/include/bitcoin/network/distributors/distributor_stratum_v1.hpp
@@ -0,0 +1,30 @@
+/**
+ * Copyright (c) 2011-2025 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_STRATUM_V1_HPP
+#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_STRATUM_V1_HPP
+
+#include
+
+namespace libbitcoin {
+namespace network {
+
+} // namespace network
+} // namespace libbitcoin
+
+#endif
diff --git a/include/bitcoin/network/distributors/distributors.hpp b/include/bitcoin/network/distributors/distributors.hpp
index 5beb39e79..3bd85140e 100644
--- a/include/bitcoin/network/distributors/distributors.hpp
+++ b/include/bitcoin/network/distributors/distributors.hpp
@@ -19,9 +19,14 @@
#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTORS_HPP
#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTORS_HPP
-#include
#include
#include
#include
+// json-rpc
+#include
+#include
+#include
+#include
+
#endif
diff --git a/include/bitcoin/network/impl/distributors/distributor_rpc.ipp b/include/bitcoin/network/impl/distributors/distributor_rpc.ipp
index 195eb915d..2e83e6f6c 100644
--- a/include/bitcoin/network/impl/distributors/distributor_rpc.ipp
+++ b/include/bitcoin/network/impl/distributors/distributor_rpc.ipp
@@ -16,15 +16,256 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#ifndef LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_RPC_IPP
-#define LIBBITCOIN_NETWORK_DISTRIBUTORS_DISTRIBUTOR_RPC_IPP
+#include
+#include
+#include
+#include
+#include
+#include
#include
+#include
namespace libbitcoin {
namespace network {
+// make_dispatchers
+// ----------------------------------------------------------------------------
+
+TEMPLATE
+inline bool CLASS::has_params(const optional_t& parameters) NOEXCEPT
+{
+ if (!parameters.has_value())
+ return false;
+
+ return std::visit(overload
+ {
+ [](const json::array_t& param) NOEXCEPT
+ {
+ return !param.empty();
+ },
+ [](const json::object_t& param) NOEXCEPT
+ {
+ return !param.empty();
+ }
+ }, parameters.value());
+}
+
+TEMPLATE
+template
+inline Type CLASS::extract(const json::value_t& value) THROWS
+{
+ if constexpr (is_same_type)
+ return std::get(value.value());
+
+ if constexpr (is_same_type)
+ return std::get(value.value());
+
+ if constexpr (is_same_type)
+ return std::get(value.value());
+
+ // Hack: this is not a native json-rpc type.
+ if constexpr (is_same_type)
+ {
+ const auto& number = std::get(value.value());
+ if (number != std::floor(number) ||
+ number < std::numeric_limits::min() ||
+ number > std::numeric_limits::max()) {
+ throw std::invalid_argument{ "int" };
+ }
+
+ return system::to_integer(number);
+ }
+
+ ////throw std::bad_variant_access{};
+}
+
+TEMPLATE
+template
+inline Tuple CLASS::extractor(
+ const optional_t& parameters,
+ const std::array>& names) THROWS
+{
+ constexpr auto count = std::tuple_size_v;
+ if (is_zero(count) && !has_params(parameters))
+ return {};
+
+ if (!parameters.has_value())
+ throw std::invalid_argument{ "count" };
+
+ const auto get_array = [&](const json::array_t& array) THROWS
+ {
+ if (array.size() != count) throw std::invalid_argument{ "count" };
+ return [&](std::index_sequence)
+ {
+ return std::make_tuple(extract>(
+ array.at(Index))...);
+ }(std::make_index_sequence{});
+ };
+
+ const auto get_object = [&](const json::object_t& object) THROWS
+ {
+ if (object.size() != count) throw std::invalid_argument{ "count" };
+ return [&](std::index_sequence)
+ {
+ return std::make_tuple(extract>(
+ object.at(std::string{ names.at(Index) }))...);
+ }(std::make_index_sequence{});
+ };
+
+ const auto& params = parameters.value();
+
+ if constexpr (Interface::mode == rpc::group::positional)
+ {
+ if (!std::holds_alternative(params))
+ throw std::invalid_argument{ "positional" };
+
+ return get_array(std::get(params));
+ }
+ else if constexpr (Interface::mode == rpc::group::named)
+ {
+ if (!std::holds_alternative(params))
+ throw std::invalid_argument{ "named" };
+
+ return get_object(std::get(params));
+ }
+ else // if constexpr (Interface::mode == rpc::group::either)
+ {
+ if (std::holds_alternative(params))
+ return get_array(std::get(params));
+
+ if (std::holds_alternative(params))
+ return get_object(std::get(params));
+
+ throw std::invalid_argument{ "either" };
+ }
+}
+
+TEMPLATE
+template
+inline code CLASS::notifier(subscriber_t& subscriber,
+ const optional_t& parameters,
+ const typename Method::names_t& names) NOEXCEPT
+{
+ try
+ {
+ std::apply([&](auto&&... args) THROWS
+ {
+ subscriber.notify(error::success, typename Method::tag{},
+ std::forward(args)...);
+ }, extractor(parameters, names));
+
+ return error::success;
+ }
+ catch (...)
+ {
+ return error::not_found;
+ }
+}
+
+TEMPLATE
+template
+inline code CLASS::do_notify(distributor_rpc& self,
+ const optional_t& params) NOEXCEPT
+{
+ using method_t = std::tuple_element_t;
+ auto& subscriber = std::get(self.subscribers_);
+ const auto& names = std::get(Interface::methods).names();
+ return notifier(subscriber, params, names);
+}
+
+TEMPLATE
+template
+inline constexpr CLASS::dispatch_t CLASS::make_dispatchers(
+ std::index_sequence) NOEXCEPT
+{
+ return dispatch_t
+ {
+ std::make_pair
+ (
+ std::string{ std::tuple_element_t::name },
+ &do_notify
+ )...
+ };
+}
+
+TEMPLATE
+const typename CLASS::dispatch_t
+CLASS::dispatch_ = make_dispatchers(sequence_t{});
+
+// make_subscribers
+// ----------------------------------------------------------------------------
+
+TEMPLATE
+template
+inline CLASS::subscribers_t CLASS::make_subscribers(asio::strand& strand,
+ std::index_sequence) NOEXCEPT
+{
+ return std::make_tuple
+ (
+ subscriber_t>(strand)...
+ );
+}
+
+// subscribe helpers
+// ----------------------------------------------------------------------------
+
+TEMPLATE
+template
+inline constexpr size_t CLASS::find_tag_index() NOEXCEPT
+{
+ static_assert(Index < std::tuple_size_v);
+ using method_tag = typename std::tuple_element_t::tag;
+ if constexpr (!is_same_type)
+ return find_tag_index();
+
+ return Index;
+}
+
+// public
+// ----------------------------------------------------------------------------
+
+TEMPLATE
+template
+inline code CLASS::subscribe(Handler&& handler) NOEXCEPT
+{
+ using traits = traits>;
+ using tag = typename traits::tag;
+ using handler_args = typename traits::args;
+ constexpr auto index = find_tag_index();
+ using method_t = std::tuple_element_t;
+ using method_args = typename method_t::args;
+ static_assert(is_same_type);
+ auto& subscriber = std::get(subscribers_);
+ return subscriber.subscribe(std::forward(handler));
+}
+
+TEMPLATE
+inline CLASS::distributor_rpc(asio::strand& strand) NOEXCEPT
+ : subscribers_(make_subscribers(strand, sequence_t{}))
+{
+}
+
+TEMPLATE
+inline code CLASS::notify(const json::request_t& request) NOEXCEPT
+{
+ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
+ const auto it = dispatch_.find(request.method);
+ if (it == dispatch_.end())
+ return error::not_found;
+
+ return it->second(*this, request.params);
+ BC_POP_WARNING()
+}
+
+TEMPLATE
+inline void CLASS::stop(const code& ec) NOEXCEPT
+{
+ std::apply([&ec](auto&&... subscriber) NOEXCEPT
+ {
+ (subscriber.stop_default(ec), ...);
+ }, subscribers_);
+}
+
} // namespace network
} // namespace libbitcoin
-
-#endif
diff --git a/include/bitcoin/network/messages/json/rpc.hpp b/include/bitcoin/network/messages/json/rpc.hpp
index 2aa1fc476..1ec5f96a2 100644
--- a/include/bitcoin/network/messages/json/rpc.hpp
+++ b/include/bitcoin/network/messages/json/rpc.hpp
@@ -64,6 +64,14 @@ struct value_t
object_t
>;
+ /// Explicit initialization constructors.
+ value_t(null_t) NOEXCEPT : inner_{ null_t{} } {}
+ value_t(boolean_t value) NOEXCEPT : inner_{ value } {}
+ value_t(number_t value) NOEXCEPT : inner_{ value } {}
+ value_t(string_t value) NOEXCEPT : inner_{ std::move(value) } {}
+ value_t(array_t value) NOEXCEPT : inner_{ std::move(value) } {}
+ value_t(object_t value) NOEXCEPT : inner_{ std::move(value) } {}
+
/// Forwarding constructors for in-place variant construction.
FORWARD_VARIANT_CONSTRUCT(value_t, inner_)
FORWARD_VARIANT_ASSIGNMENT(value_t, inner_)
diff --git a/src/distributors/distributor_http.cpp b/src/distributors/distributor_http.cpp
index 8aa98bd2f..085e27d78 100644
--- a/src/distributors/distributor_http.cpp
+++ b/src/distributors/distributor_http.cpp
@@ -90,9 +90,9 @@ void distributor_http::stop(const code& ec) NOEXCEPT
#undef SUBSCRIBER
#undef MAKE_SUBSCRIBER
+#undef STOP_SUBSCRIBER
#undef DO_NOTIFY
#undef CASE_NOTIFY
-#undef STOP_SUBSCRIBER
} // namespace network
} // namespace libbitcoin
diff --git a/src/distributors/distributor_peer.cpp b/src/distributors/distributor_peer.cpp
index 86deb76e1..919a679c1 100644
--- a/src/distributors/distributor_peer.cpp
+++ b/src/distributors/distributor_peer.cpp
@@ -200,8 +200,8 @@ BC_POP_WARNING()
#undef SUBSCRIBER
#undef MAKE_SUBSCRIBER
-#undef CASE_NOTIFY
#undef STOP_SUBSCRIBER
+#undef CASE_NOTIFY
} // namespace network
} // namespace libbitcoin
diff --git a/src/distributors/distributor_rpc.cpp b/src/distributors/distributor_rpc.cpp
index de059f3d7..e6ad1064b 100644
--- a/src/distributors/distributor_rpc.cpp
+++ b/src/distributors/distributor_rpc.cpp
@@ -18,205 +18,10 @@
*/
#include
-#include
-#include
-#include
-#include
-#include
#include
-#include
namespace libbitcoin {
namespace network {
-// macros
-// ----------------------------------------------------------------------------
-#define DISPATCH(name) { #name, ¬ify_##name }
-#define PARAMS(...) std::make_tuple(__VA_ARGS__)
-#define DEFINE_METHOD(name, mode, names, ...) \
-code distributor_rpc::notify_##name(distributor_rpc& self, \
- const optional_t& params) { return notifier<__VA_ARGS__>( \
- self.name##_subscriber_, params, container::mode, names); }
-
-// extract/invoke
-// ----------------------------------------------------------------------------
-// private/static
-
-template
-Type distributor_rpc::extract(const json::value_t&) THROWS
-{
- throw std::bad_variant_access{};
-}
-
-template <>
-bool distributor_rpc::extract(const json::value_t& value) THROWS
-{
- return std::get(value.value());
-}
-
-template <>
-std::string distributor_rpc::extract(
- const json::value_t& value) THROWS
-{
- return std::get(value.value());
-}
-
-template <>
-double distributor_rpc::extract(const json::value_t& value) THROWS
-{
- return std::get(value.value());
-}
-
-template <>
-int distributor_rpc::extract(const json::value_t& value) THROWS
-{
- const auto& number = std::get(value.value());
- if (number != std::floor(number) ||
- number < std::numeric_limits::min() ||
- number > std::numeric_limits::max())
- {
- throw std::invalid_argument{ "int" };
- }
-
- return system::to_integer(number);
-}
-
-template
-std::tuple distributor_rpc::extractor(
- const optional_t& parameters, container mode,
- const names_t& names) THROWS
-{
- constexpr auto count = sizeof...(Args);
- if (is_zero(count) && !has_params(parameters))
- return {};
-
- if (!parameters.has_value())
- throw std::invalid_argument{ "count" };
-
- const auto get_array = [&](const json::array_t& array) THROWS
- {
- if (array.size() != count) throw std::invalid_argument{ "count" };
- return [&](std::index_sequence)
- {
- return std::make_tuple(extract(array.at(Index))...);
- }(std::make_index_sequence{});
- };
-
- const auto get_object = [&](const json::object_t& object) THROWS
- {
-
- if (object.size() != count) throw std::invalid_argument{ "count" };
- return [&](std::index_sequence)
- {
- return std::make_tuple(extract(
- object.at(std::string{ names.at(Index) }))...);
- }(std::make_index_sequence{});
- };
-
- const auto& params = parameters.value();
- switch (mode)
- {
- case container::positional:
- if (!std::holds_alternative(params))
- throw std::invalid_argument{ "positional" };
- return get_array(std::get(params));
- case container::named:
- if (!std::holds_alternative(params))
- throw std::invalid_argument{ "named" };
- return get_object(std::get(params));
- case container::either:
- if (std::holds_alternative(params))
- return get_array(std::get(params));
- if (std::holds_alternative(params))
- return get_object(std::get(params));
- }
-
- throw std::invalid_argument{ "container" };
-}
-
-template
-code distributor_rpc::notifier(auto& subscriber, const optional_t& parameters,
- container mode, auto&& tuple) NOEXCEPT
-{
- static auto names = std::apply([](auto&&... args) NOEXCEPT
- {
- return names_t{ std::forward(args)... };
- }, std::forward(tuple));
-
- try
- {
- std::apply([&](auto&&... args) THROWS
- {
- subscriber.notify({}, std::forward(args)...);
- }, extractor(parameters, mode, names));
-
- return error::success;
- }
- catch (...)
- {
- return error::not_found;
- }
-}
-
-bool distributor_rpc::has_params(const optional_t& parameters) NOEXCEPT
-{
- if (!parameters.has_value())
- return false;
-
- return std::visit(overload
- {
- [] (const json::array_t& param) NOEXCEPT
- {
- return !param.empty();
- },
- [](const json::object_t& param) NOEXCEPT
- {
- return !param.empty();
- }
- }, parameters.value());
-}
-
-// define map
-// ----------------------------------------------------------------------------
-
-// Example methods.
-
-// protected methods with parameter names and types
-DEFINE_METHOD(get_version, either, PARAMS())
-DEFINE_METHOD(add_element, either, PARAMS("a", "b"), int, int)
-
-// private map
-const distributor_rpc::dispatch_t distributor_rpc::dispatch_
-{
- DISPATCH(get_version),
- DISPATCH(add_element)
-};
-
-// public
-// ----------------------------------------------------------------------------
-
-distributor_rpc::distributor_rpc(asio::strand& strand) NOEXCEPT
- : get_version_subscriber_{ strand },
- add_element_subscriber_{ strand }
-{
-}
-
-void distributor_rpc::stop(const code& ec) NOEXCEPT
-{
- get_version_subscriber_.stop_default(ec);
- add_element_subscriber_.stop_default(ec);
-}
-
-code distributor_rpc::notify(const json::request_t& request) NOEXCEPT
-{
- BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
- const auto it = dispatch_.find(request.method);
- if (it == dispatch_.end())
- return error::not_found;
-
- return it->second(*this, request.params);
- BC_POP_WARNING()
-}
-
} // namespace network
} // namespace libbitcoin
diff --git a/test/distributors/distributor_rpc.cpp b/test/distributors/distributor_rpc.cpp
index 1566ace05..55b19c9dd 100644
--- a/test/distributors/distributor_rpc.cpp
+++ b/test/distributors/distributor_rpc.cpp
@@ -18,6 +18,553 @@
*/
#include "../test.hpp"
-BOOST_AUTO_TEST_SUITE(distributor_tests)
+#include
+
+BOOST_AUTO_TEST_SUITE(distributor_rpc_tests)
+
+using namespace json;
+
+static_assert( is_same_type, rpc::method<"test2">>);
+static_assert(!is_same_type, rpc::method<"test2">>);
+static_assert(!is_same_type, rpc::method<"test1", int>>);
+static_assert(!is_same_type, rpc::method<"test2", bool>>);
+static_assert( is_same_type, rpc::method<"test1", bool>>);
+
+using get_version = std::tuple_element_t<0, decltype(bitcoind::methods)>::tag;
+using add_element = std::tuple_element_t<1, decltype(bitcoind::methods)>::tag;
+
+BOOST_AUTO_TEST_CASE(distributor_rpc__construct__stop__stops)
+{
+ threadpool pool(2);
+ asio::strand strand(pool.service().get_executor());
+ distributor_rpc instance(strand);
+
+ std::promise promise{};
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ instance.stop(error::service_stopped);
+ promise.set_value(true);
+ });
+
+ pool.stop();
+ BOOST_REQUIRE(pool.join());
+ BOOST_REQUIRE(promise.get_future().get());
+}
+
+BOOST_AUTO_TEST_CASE(distributor_rpc__notify__no_subscriber__success)
+{
+ threadpool pool(2);
+ asio::strand strand(pool.service().get_executor());
+ distributor_rpc instance(strand);
+
+ std::promise promise{};
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ request_t request{};
+ request.method = "get_version";
+ const auto ec = instance.notify(request);
+ promise.set_value(ec);
+ });
+
+ pool.stop();
+ BOOST_REQUIRE(pool.join());
+ BOOST_REQUIRE_EQUAL(promise.get_future().get(), error::success);
+}
+
+BOOST_AUTO_TEST_CASE(distributor_rpc__notify__unknown_method__returns_not_found)
+{
+ threadpool pool(2);
+ asio::strand strand(pool.service().get_executor());
+ distributor_rpc instance(strand);
+
+ std::promise promise{};
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ request_t request{};
+ request.method = "unknown_method";
+ const auto ec = instance.notify(request);
+ promise.set_value(ec);
+ });
+
+ pool.stop();
+ BOOST_REQUIRE(pool.join());
+ BOOST_REQUIRE_EQUAL(promise.get_future().get(), error::not_found);
+}
+
+BOOST_AUTO_TEST_CASE(distributor_rpc__subscribe__stopped__subscriber_stopped)
+{
+ threadpool pool(2);
+ asio::strand strand(pool.service().get_executor());
+ distributor_rpc instance(strand);
+ constexpr auto expected_ec = error::subscriber_stopped;
+ code subscribe_ec{};
+ auto result = true;
+
+ std::promise promise{};
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ instance.stop(error::invalid_magic);
+ subscribe_ec = instance.subscribe([&](const code& ec, add_element, int a, int b) NOEXCEPT
+ {
+ // Stop notification sets defaults and specified code.
+ result &= is_zero(a);
+ result &= is_zero(b);
+ promise.set_value(ec);
+ return false;
+ });
+ });
+
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ instance.stop(error::invalid_magic);
+ });
+
+ pool.stop();
+ BOOST_REQUIRE(pool.join());
+ BOOST_REQUIRE_EQUAL(promise.get_future().get(), expected_ec);
+ BOOST_REQUIRE_EQUAL(subscribe_ec, expected_ec);
+ BOOST_REQUIRE(result);
+}
+
+BOOST_AUTO_TEST_CASE(distributor_rpc__subscribe__stop__service_stopped)
+{
+ threadpool pool(2);
+ asio::strand strand(pool.service().get_executor());
+ distributor_rpc instance(strand);
+ constexpr auto expected_ec = error::service_stopped;
+ code subscribe_ec{};
+ auto result = true;
+
+ std::promise promise;
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ subscribe_ec = instance.subscribe([&](const code& ec, add_element, int a, int b) NOEXCEPT
+ {
+ // Stop notification sets defaults and specified code.
+ result &= is_zero(a);
+ result &= is_zero(b);
+ promise.set_value(ec);
+ return true;
+ });
+ });
+
+ boost::asio::post(strand, [&]() NOEXCEPT
+ {
+ instance.stop(expected_ec);
+ });
+
+ pool.stop();
+ BOOST_REQUIRE(pool.join());
+ BOOST_REQUIRE_EQUAL(promise.get_future().get(), expected_ec);
+ BOOST_REQUIRE(!subscribe_ec);
+ BOOST_REQUIRE(result);
+}
+
+////BOOST_AUTO_TEST_CASE(distributor_rpc__subscribe__multiple__both_return_success_no_invoke)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool first_called{};
+//// bool second_called{};
+//// std::promise> promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// const auto ec1 = instance.subscribe(
+//// [&](const code&, add_element, int, int) NOEXCEPT
+//// {
+//// first_called = true;
+//// return true;
+//// });
+////
+//// const auto ec2 = instance.subscribe(
+//// [&](const code&, add_element, int, int) NOEXCEPT
+//// {
+//// second_called = true;
+//// return true;
+//// });
+////
+//// promise.set_value({ec1, ec2});
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+////
+//// const auto [ec1, ec2] = promise.get_future().get();
+//// BOOST_REQUIRE(!ec1);
+//// BOOST_REQUIRE(!ec2);
+//// BOOST_REQUIRE(!first_called);
+//// BOOST_REQUIRE(!second_called);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__multiple_subscribers__invokes_both)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool first_called{};
+//// bool second_called{};
+//// int first_result_a{};
+//// int first_result_b{};
+//// int second_result_a{};
+//// int second_result_b{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe(
+//// [&](const code&, add_element, int a, int b) NOEXCEPT
+//// {
+//// first_called = true;
+//// first_result_a = a;
+//// first_result_b = b;
+//// return true;
+//// });
+////
+//// instance.subscribe(
+//// [&](const code&, add_element, int a, int b) NOEXCEPT
+//// {
+//// second_called = true;
+//// second_result_a = a;
+//// second_result_b = b;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "add_element";
+//// array_t params_array{ value_t{ 42.0 }, value_t{ 24.0 } };
+//// request.params = params_t{ params_array };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE(!promise.get_future().get());
+//// BOOST_REQUIRE(first_called);
+//// BOOST_REQUIRE(second_called);
+//// BOOST_REQUIRE_EQUAL(first_result_a, 42);
+//// BOOST_REQUIRE_EQUAL(first_result_b, 24);
+//// BOOST_REQUIRE_EQUAL(second_result_a, 42);
+//// BOOST_REQUIRE_EQUAL(second_result_b, 24);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__stop__subscribed_handler__notifies_with_error_code)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// code result_ec{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe([&](const code& ec, get_version) NOEXCEPT
+//// {
+//// called = true;
+//// result_ec = ec;
+//// return false;
+//// });
+////
+//// instance.stop(error::service_stopped);
+//// promise.set_value(true);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE(promise.get_future().get());
+//// BOOST_REQUIRE(called);
+//// BOOST_REQUIRE_EQUAL(result_ec, error::service_stopped);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__get_version_no_params__success_and_notifies)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// code result_ec{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe([&](const code& ec, get_version) NOEXCEPT
+//// {
+//// called = true;
+//// result_ec = ec;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "get_version";
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// const auto notify_ec = promise.get_future().get();
+//// BOOST_REQUIRE(!notify_ec);
+//// BOOST_REQUIRE(called);
+//// BOOST_REQUIRE(!result_ec);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__get_version_empty_array_params__success_and_notifies)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe([&](const code&, get_version) NOEXCEPT
+//// {
+//// called = true;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "get_version";
+//// request.params = params_t{ array_t{} };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE(!promise.get_future().get());
+//// BOOST_REQUIRE(called);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__get_version_non_empty_params__not_found_no_notify)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe([&](const code&, get_version) NOEXCEPT
+//// {
+//// called = true;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "get_version";
+//// array_t params_array{ value_t{ 1.0 } };
+//// request.params = params_t{ params_array };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE_EQUAL(promise.get_future().get(), error::not_found);
+//// BOOST_REQUIRE(!called);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__add_element_positional_params__success_and_notifies_with_args)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// int result_a{};
+//// int result_b{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe(
+//// [&](const code&, add_element, int a, int b) NOEXCEPT
+//// {
+//// called = true;
+//// result_a = a;
+//// result_b = b;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "add_element";
+//// array_t params_array{ value_t{ 42.0 }, value_t{ 24.0 } };
+//// request.params = params_t{ params_array };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE(!promise.get_future().get());
+//// BOOST_REQUIRE(called);
+//// BOOST_REQUIRE_EQUAL(result_a, 42);
+//// BOOST_REQUIRE_EQUAL(result_b, 24);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__add_element_named_params__success_and_notifies_with_args)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// int result_a{};
+//// int result_b{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe(
+//// [&](const code&, add_element, int a, int b) NOEXCEPT
+//// {
+//// called = true;
+//// result_a = a;
+//// result_b = b;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "add_element";
+//// object_t params_object{ { "a", value_t{ 42.0 } }, { "b", value_t{ 24.0 } } };
+//// request.params = params_t{ params_object };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE(!promise.get_future().get());
+//// BOOST_REQUIRE(called);
+//// BOOST_REQUIRE_EQUAL(result_a, 42);
+//// BOOST_REQUIRE_EQUAL(result_b, 24);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__add_element_wrong_param_count__not_found_no_notify)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe(
+//// [&](const code&, add_element, int, int) NOEXCEPT
+//// {
+//// called = true;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "add_element";
+//// array_t params_array{ value_t{ 42.0 } };
+//// request.params = params_t{ params_array };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE_EQUAL(promise.get_future().get(), error::not_found);
+//// BOOST_REQUIRE(!called);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__add_element_invalid_type__not_found_no_notify)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// std::promise promise{};
+//// boost::asio::post(strand, [&]() NOEXCEPT
+//// {
+//// instance.subscribe([&](const code&, add_element, int, int) NOEXCEPT
+//// {
+//// called = true;
+//// return true;
+//// });
+////
+//// request_t request{};
+//// request.method = "add_element";
+//// array_t params_array{ value_t{ string_t{ "invalid" } }, value_t{ 24.0 } };
+//// request.params = params_t{ params_array };
+//// const auto ec = instance.notify(request);
+//// promise.set_value(ec);
+//// });
+////
+//// pool.stop();
+//// BOOST_REQUIRE(pool.join());
+//// BOOST_REQUIRE_EQUAL(promise.get_future().get(), error::not_found);
+//// BOOST_REQUIRE(!called);
+////}
+////
+////BOOST_AUTO_TEST_CASE(distributor_rpc__notify__add_element_non_integral_number__not_found_no_notify)
+////{
+//// threadpool pool(2);
+//// asio::strand strand(pool.service().get_executor());
+//// distributor_rpc instance(strand);
+////
+//// bool called{};
+//// std::promise