Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion src/cryptonote_core/cryptonote_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ namespace cryptonote
, "Run a program for each new block, '%s' will be replaced by the block hash"
, ""
};
static const command_line::arg_descriptor<std::string> arg_sync_notify = {
"sync-notify"
, "Run a program when we start or stop syncing, '%h' will be replaced by the "
"current height, '%t' by the target height, '%s' by 1 if synced, 0 if not."
, ""
};
static const command_line::arg_descriptor<bool> arg_prune_blockchain = {
"prune-blockchain"
, "Prune blockchain"
Expand Down Expand Up @@ -345,6 +351,7 @@ namespace cryptonote
command_line::add_arg(desc, arg_sync_pruned_blocks);
command_line::add_arg(desc, arg_max_txpool_weight);
command_line::add_arg(desc, arg_block_notify);
command_line::add_arg(desc, arg_sync_notify);
command_line::add_arg(desc, arg_prune_blockchain);
command_line::add_arg(desc, arg_reorg_notify);
command_line::add_arg(desc, arg_block_rate_notify);
Expand Down Expand Up @@ -647,6 +654,28 @@ namespace cryptonote
MERROR("Failed to parse block notify spec: " << e.what());
}

try
{
if (!command_line::is_arg_defaulted(vm, arg_sync_notify))
{
struct sync_notify
{
tools::Notify cmdline;

void operator()(bool syncing, std::uint64_t height, std::uint64_t target) const
{
cmdline.notify("%s", syncing ? "1" : " 0", "%h", std::to_string(height).c_str(), "%t", std::to_string(target).c_str(), NULL);
}
};

add_sync_notify(sync_notify{{command_line::get_arg(vm, arg_sync_notify).c_str()}});
}
}
catch (const std::exception &e)
{
MERROR("Failed to parse sync notify spec: " << e.what());
}

try
{
if (!command_line::is_arg_defaulted(vm, arg_reorg_notify))
Expand Down Expand Up @@ -1451,6 +1480,32 @@ namespace cryptonote
m_miner.resume();
}
//-----------------------------------------------------------------------------------------------
void core::add_sync_notify(boost::function<void(bool, std::uint64_t, std::uint64_t)>&& notify)
{
if (notify)
{
m_sync_notifiers.push_back(std::move(notify));
}
}
//-----------------------------------------------------------------------------------------------
void core::on_start_syncing(uint64_t target)
{
MINFO("Starting syncing");
const uint64_t current_blockchain_height = get_current_blockchain_height();
if (target >= current_blockchain_height + 5) // don't switch to unsafe mode just for a few blocks
safesyncmode(false);
for (const auto& notifier : m_sync_notifiers)
notifier(true, current_blockchain_height, target);
}
//-----------------------------------------------------------------------------------------------
void core::on_stop_syncing()
{
MINFO("Stopping syncing");
safesyncmode(true);
for (const auto& notifier : m_sync_notifiers)
notifier(false, get_current_blockchain_height(), 0);
}
//-----------------------------------------------------------------------------------------------
block_complete_entry get_block_complete_entry(block& b, tx_memory_pool &pool)
{
block_complete_entry bce;
Expand Down Expand Up @@ -1537,7 +1592,12 @@ namespace cryptonote
//-----------------------------------------------------------------------------------------------
bool core::add_new_block(const block& b, block_verification_context& bvc)
{
return m_blockchain_storage.add_new_block(b, bvc);
const bool syncing = get_current_blockchain_height() < get_target_blockchain_height();
if (!m_blockchain_storage.add_new_block(b, bvc))
return false;
if (syncing && get_current_blockchain_height() >= get_target_blockchain_height())
on_stop_syncing();
return true;
}

//-----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -2025,6 +2085,11 @@ namespace cryptonote
//-----------------------------------------------------------------------------------------------
void core::set_target_blockchain_height(uint64_t target_blockchain_height)
{
const uint64_t height = get_current_blockchain_height();
if (m_target_blockchain_height > height && target_blockchain_height <= height)
on_stop_syncing();
else if (target_blockchain_height > height)
on_start_syncing(target_blockchain_height);
m_target_blockchain_height = target_blockchain_height;
}
//-----------------------------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions src/cryptonote_core/cryptonote_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,16 @@ namespace cryptonote
*/
bool get_txpool_complement(const std::vector<crypto::hash> &hashes, std::vector<cryptonote::blobdata> &txes);

/**
* @brief sets a sync notify object to call when we start/stop syncing
*
* @param notify the notify object to call
*/
void add_sync_notify(boost::function<void(bool, uint64_t, uint64_t)> &&notify);

void on_start_syncing(uint64_t target);
void on_stop_syncing();

private:

/**
Expand Down Expand Up @@ -1131,6 +1141,8 @@ namespace cryptonote

std::shared_ptr<tools::Notify> m_block_rate_notify;
boost::function<void(std::vector<txpool_event>)> m_zmq_pub;

std::vector<boost::function<void(bool, std::uint64_t, std::uint64_t)>> m_sync_notifiers;
};
}

Expand Down
1 change: 1 addition & 0 deletions src/cryptonote_protocol/cryptonote_protocol_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ namespace cryptonote
void log_connections();
std::list<connection_info> get_connections();
const block_queue &get_block_queue() const { return m_block_queue; }
bool has_more_blocks_queued() const { return m_block_queue.get_data_size() > 0; }
void stop();
void on_connection_close(cryptonote_connection_context &context);
void set_max_out_peers(unsigned int max) { m_max_out_peers = max; }
Expand Down
5 changes: 0 additions & 5 deletions src/cryptonote_protocol/cryptonote_protocol_handler.inl
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ namespace cryptonote
<< " [Your node is " << abs_diff << " blocks (" << tools::get_human_readable_timespan((abs_diff - diff_v2) * DIFFICULTY_TARGET_V1 + diff_v2 * DIFFICULTY_TARGET_V2) << ") "
<< (0 <= diff ? std::string("behind") : std::string("ahead"))
<< "] " << ENDL << "SYNCHRONIZATION started");
if (hshd.current_height >= m_core.get_current_blockchain_height() + 5) // don't switch to unsafe mode just for a few blocks
{
m_core.safesyncmode(false);
}
if (m_core.get_target_blockchain_height() == 0) // only when sync starts
{
m_sync_timer.resume();
Expand Down Expand Up @@ -2479,7 +2475,6 @@ skip:
}
m_core.on_synchronized();
}
m_core.safesyncmode(true);
m_p2p->clear_used_stripe_peers();

// ask for txpool complement from any suitable node if we did not yet
Expand Down
1 change: 1 addition & 0 deletions src/daemon/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ struct t_internals {
{
core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared});
core.get().get_blockchain_storage().add_miner_notify(cryptonote::listener::zmq_pub::miner_data{shared});
core.get().add_sync_notify(cryptonote::listener::zmq_pub::sync{shared});
core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared});
}
}
Expand Down
66 changes: 63 additions & 3 deletions src/rpc/zmq_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@
namespace
{
constexpr const char txpool_signal[] = "tx_signal";
constexpr const char sync_signal[] = "sync_signal";

using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>);
using miner_writer = void(epee::byte_stream&, uint8_t, uint64_t, const crypto::hash&, const crypto::hash&, cryptonote::difficulty_type, uint64_t, uint64_t, const std::vector<cryptonote::tx_block_template_backlog_entry>&);
using sync_writer = void(epee::byte_stream&, bool, std::uint64_t, std::uint64_t);
using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>);

template<typename F>
Expand Down Expand Up @@ -132,6 +134,14 @@ namespace
const std::vector<cryptonote::tx_block_template_backlog_entry>& tx_backlog;
};

//! Object for sync notification serialization
struct minimal_sync
{
const bool syncing;
const std::uint64_t height;
const std::uint64_t target;
};

//! Object for "minimal" tx serialization
struct minimal_txpool
{
Expand Down Expand Up @@ -187,6 +197,17 @@ namespace
dest.EndObject();
}

void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_sync self)
{
namespace adapt = boost::adaptors;

dest.StartObject();
INSERT_INTO_JSON_OBJECT(dest, syncing, self.syncing);
INSERT_INTO_JSON_OBJECT(dest, height, self.height);
INSERT_INTO_JSON_OBJECT(dest, target, self.target);
dest.EndObject();
}

void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
{
json_pub(buf, blocks);
Expand All @@ -202,6 +223,11 @@ namespace
json_pub(buf, miner_data{major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog});
}

void json_minimal_sync(epee::byte_stream& buf, bool syncing, const std::uint64_t height, const std::uint64_t target)
{
json_pub(buf, minimal_sync{syncing, height, target});
}

// boost::adaptors are in place "views" - no copy/move takes place
// moving transactions (via sort, etc.), is expensive!

Expand Down Expand Up @@ -236,6 +262,12 @@ namespace
{u8"json-full-miner_data", json_miner_data},
}};

constexpr const std::array<context<sync_writer>, 2> sync_contexts =
{{
{u8"json-full-sync", json_minimal_sync},
{u8"json-minimal-sync", json_minimal_sync},
}};

constexpr const std::array<context<txpool_writer>, 2> txpool_contexts =
{{
{u8"json-full-txpool_add", json_full_txpool},
Expand Down Expand Up @@ -336,7 +368,7 @@ namespace
zmq_msg_size(std::addressof(msg))
};

if (payload == txpool_signal)
if (payload == txpool_signal || payload == sync_signal)
{
zmq_msg_close(std::addressof(msg));
return false;
Expand All @@ -360,6 +392,7 @@ zmq_pub::zmq_pub(void* context)
: relay_(),
chain_subs_{{0}},
miner_subs_{{0}},
sync_subs_({0}),
txpool_subs_{{0}},
sync_()
{
Expand All @@ -368,6 +401,7 @@ zmq_pub::zmq_pub(void* context)

verify_sorted(chain_contexts, "chain_contexts");
verify_sorted(miner_contexts, "miner_contexts");
verify_sorted(sync_contexts, "sync_contexts");
verify_sorted(txpool_contexts, "txpool_contexts");

relay_.reset(zmq_socket(context, ZMQ_PAIR));
Expand All @@ -389,24 +423,27 @@ bool zmq_pub::sub_request(boost::string_ref message)

const auto chain_range = get_range(chain_contexts, message);
const auto miner_range = get_range(miner_contexts, message);
const auto sync_range = get_range(sync_contexts, message);
const auto txpool_range = get_range(txpool_contexts, message);

if (!chain_range.empty() || !miner_range.empty() || !txpool_range.empty())
if (!chain_range.empty() || !miner_range.empty() || !sync_range.empty() || !txpool_range.empty())
{
MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " <<
chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s) and " << txpool_range.size() << " txpool topic(s)");
chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s), " << sync_range.size() << " sync topic(s) and " << txpool_range.size() << " txpool topic(s)");

const boost::lock_guard<boost::mutex> lock{sync_};
switch (tag)
{
case 0:
remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
remove_subscriptions(miner_subs_, miner_range, miner_contexts.begin());
remove_subscriptions(sync_subs_, sync_range, sync_contexts.begin());
remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
case 1:
add_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
add_subscriptions(miner_subs_, miner_range, miner_contexts.begin());
add_subscriptions(sync_subs_, sync_range, sync_contexts.begin());
add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
default:
Expand Down Expand Up @@ -498,6 +535,20 @@ std::size_t zmq_pub::send_miner_data(uint8_t major_version, uint64_t height, con
return 0;
}

std::size_t zmq_pub::send_sync(bool syncing, std::uint64_t height, std::uint64_t target)
{
const boost::lock_guard<boost::mutex> lock{sync_};
for (const std::size_t sub : sync_subs_)
{
if (sub)
{
auto messages = make_pubs(sync_subs_, sync_contexts, syncing, height, target);
return send_messages(relay_.get(), messages);
}
}
return 0;
}

std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes)
{
if (txes.empty())
Expand Down Expand Up @@ -537,6 +588,15 @@ void zmq_pub::miner_data::operator()(uint8_t major_version, uint64_t height, con
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}

void zmq_pub::sync::operator()(bool syncing, std::uint64_t height, std::uint64_t target) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();
if (self)
self->send_sync(syncing, height, target);
else
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}

void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();
Expand Down
13 changes: 13 additions & 0 deletions src/rpc/zmq_pub.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class zmq_pub
std::deque<std::vector<txpool_event>> txes_;
std::array<std::size_t, 2> chain_subs_;
std::array<std::size_t, 1> miner_subs_;
std::array<std::size_t, 2> sync_subs_;
std::array<std::size_t, 2> txpool_subs_;
boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays.

Expand Down Expand Up @@ -95,6 +96,11 @@ class zmq_pub
\return Number of ZMQ messages sent to relay. */
std::size_t send_miner_data(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog);

/*! Send a `ZMQ_PUB` notification when starting/stopping syncing.
Thread-safe.
\return Number of ZMQ messages sent to relay. */
std::size_t send_sync(bool syncing, std::uint64_t height, std::uint64_t target);

/*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local
pool. Thread-safe.
\return Number of ZMQ messages sent to relay. */
Expand All @@ -114,6 +120,13 @@ class zmq_pub
void operator()(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog) const;
};

//! Callable for `send_sync` with weak ownership to `zmq_pub` object.
struct sync
{
std::weak_ptr<zmq_pub> self_;
void operator()(bool, std::uint64_t height, std::uint64_t target) const;
};

//! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object.
struct txpool_add
{
Expand Down