diff --git a/src/Makefile.am b/src/Makefile.am index 9a3b18c65482a..edd19642a2a07 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -271,6 +271,7 @@ BITCOIN_CORE_H = \ llmq/quorums.h \ llmq/signhash.h \ llmq/signing.h \ + llmq/net_signing.h \ llmq/signing_shares.h \ llmq/snapshot.h \ llmq/types.h \ @@ -535,6 +536,7 @@ libbitcoin_node_a_SOURCES = \ llmq/dkgsessionhandler.cpp \ llmq/dkgsessionmgr.cpp \ llmq/ehf_signals.cpp \ + llmq/net_signing.cpp \ llmq/options.cpp \ llmq/quorums.cpp \ llmq/signhash.cpp \ diff --git a/src/evo/mnhftx.cpp b/src/evo/mnhftx.cpp index 54e5d0c29ce0b..b71e28f2e7b7a 100644 --- a/src/evo/mnhftx.cpp +++ b/src/evo/mnhftx.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include diff --git a/src/init.cpp b/src/init.cpp index 642df953f34f3..e1e1089f81e80 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -91,7 +91,7 @@ #include #include #include -#include +#include #include #include #include @@ -2200,6 +2200,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) RegisterValidationInterface(g_active_notification_interface.get()); } node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman, chainman.ActiveChainstate())); + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman)); // ********************************************************* Step 7d: Setup other Dash services diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index e3c7b4911f1b4..00a21fc8c6b7d 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -30,7 +30,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& d qman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db, *quorum_block_processor, *qsnapman, mn_activeman, mn_sync, sporkman, db_params)}, - sigman{std::make_unique(chainman.ActiveChainstate(), *qman, db_params)}, + sigman{std::make_unique(*qman, db_params)}, clhandler{std::make_unique(chainman.ActiveChainstate(), *qman, sporkman, mempool, mn_sync)}, isman{std::make_unique(*clhandler, chainman.ActiveChainstate(), *sigman, sporkman, mempool, mn_sync, db_params)} @@ -44,19 +44,16 @@ LLMQContext::~LLMQContext() { } void LLMQContext::Interrupt() { - sigman->InterruptWorkerThread(); } void LLMQContext::Start(PeerManager& peerman) { qman->Start(); - sigman->StartWorkerThread(peerman); clhandler->Start(*isman); } void LLMQContext::Stop() { clhandler->Stop(); - sigman->StopWorkerThread(); qman->Stop(); } diff --git a/src/llmq/net_signing.cpp b/src/llmq/net_signing.cpp new file mode 100644 index 0000000000000..cf397f0ecf746 --- /dev/null +++ b/src/llmq/net_signing.cpp @@ -0,0 +1,149 @@ +// Copyright (c) 2025 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +void NetSigning::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +{ + if (msg_type != NetMsgType::QSIGREC) return; + + auto recoveredSig = std::make_shared(); + vRecv >> *recoveredSig; + + WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(pfrom.GetId(), + CInv{MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()})); + + if (!Params().GetLLMQ(recoveredSig->getLlmqType()).has_value()) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + + m_sig_manager.VerifyAndProcessRecoveredSig(pfrom.GetId(), std::move(recoveredSig)); +} + +void NetSigning::Start() +{ + // can't start new thread if we have one running already + if (workThread.joinable()) { + assert(false); + } + + workThread = std::thread(&util::TraceThread, "recsigs", [this] { WorkThreadMain(); }); +} + +void NetSigning::Stop() +{ + // make sure to call InterruptWorkerThread() first + if (!workInterrupt) { + assert(false); + } + + if (workThread.joinable()) { + workThread.join(); + } +} + +void NetSigning::ProcessRecoveredSig(std::shared_ptr recoveredSig) +{ + if (!m_sig_manager.ProcessRecoveredSig(recoveredSig)) return; + + auto listeners = m_sig_manager.GetListeners(); + for (auto& l : listeners) { + m_peer_manager->PeerPostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); + } + + GetMainSignals().NotifyRecoveredSig(recoveredSig, recoveredSig->GetHash().ToString()); +} + +bool NetSigning::ProcessPendingRecoveredSigs() +{ + Uint256HashMap> pending{m_sig_manager.FetchPendingReconstructed()}; + + for (const auto& p : pending) { + ProcessRecoveredSig(p.second); + } + + std::unordered_map>> recSigsByNode; + std::unordered_map, CBLSPublicKey, StaticSaltedHasher> pubkeys; + + const size_t nMaxBatchSize{32}; + bool more_work = m_sig_manager.CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, pubkeys); + if (recSigsByNode.empty()) { + return false; + } + + // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not + // craftable by individual entities, making the rogue public key attack impossible + CBLSBatchVerifier batchVerifier(false, false); + + size_t verifyCount = 0; + for (const auto& [nodeId, v] : recSigsByNode) { + for (const auto& recSig : v) { + // we didn't verify the lazy signature until now + if (!recSig->sig.Get().IsValid()) { + batchVerifier.badSources.emplace(nodeId); + break; + } + + const auto& pubkey = pubkeys.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash())); + batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), pubkey); + verifyCount++; + } + } + + cxxtimer::Timer verifyTimer(true); + batchVerifier.Verify(); + verifyTimer.stop(); + + LogPrint(BCLog::LLMQ, "NetSigning::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, + verifyCount, verifyTimer.count(), recSigsByNode.size()); + + Uint256HashSet processed; + for (const auto& [nodeId, v] : recSigsByNode) { + if (batchVerifier.badSources.count(nodeId)) { + LogPrint(BCLog::LLMQ, "NetSigning::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); + m_peer_manager->PeerMisbehaving(nodeId, 100); + continue; + } + + for (const auto& recSig : v) { + if (!processed.emplace(recSig->GetHash()).second) { + continue; + } + + ProcessRecoveredSig(recSig); + } + } + + return more_work; +} + +void NetSigning::WorkThreadMain() +{ + while (!workInterrupt) { + bool fMoreWork = ProcessPendingRecoveredSigs(); + + constexpr auto CLEANUP_INTERVAL{5000ms}; + if (cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) { + m_sig_manager.Cleanup(); + } + + // TODO Wakeup when pending signing is needed? + if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + return; + } + } +} diff --git a/src/llmq/net_signing.h b/src/llmq/net_signing.h new file mode 100644 index 0000000000000..72e4038dcda90 --- /dev/null +++ b/src/llmq/net_signing.h @@ -0,0 +1,47 @@ +// Copyright (c) 2025 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_LLMQ_NET_SIGNING_H +#define BITCOIN_LLMQ_NET_SIGNING_H + +#include + +#include +#include + +#include + +namespace llmq { +class CSigningManager; +} // namespace llmq + +class NetSigning final : public NetHandler +{ +public: + NetSigning(PeerManagerInternal* peer_manager, llmq::CSigningManager& sig_manager) : + NetHandler(peer_manager), + m_sig_manager(sig_manager) + { + workInterrupt.reset(); + } + void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; + + [[nodiscard]] bool ProcessPendingRecoveredSigs(); + void ProcessRecoveredSig(std::shared_ptr recoveredSig); + + void Start() override; + void Stop() override; + void Interrupt() override { workInterrupt(); }; + + void WorkThreadMain(); + +private: + llmq::CSigningManager& m_sig_manager; + + CleanupThrottler cleanupThrottler; + std::thread workThread; + CThreadInterrupt workInterrupt; +}; + +#endif // BITCOIN_LLMQ_NET_SIGNING_H diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 4a1ac7a6b5286..b852965bc5489 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -7,22 +7,14 @@ #include #include #include -#include -#include #include -#include #include -#include -#include -#include #include -#include -#include -#include -#include +#include #include +#include #include namespace llmq @@ -333,10 +325,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// -CSigningManager::CSigningManager(const CChainState& chainstate, const CQuorumManager& _qman, - const util::DbWrapperParams& db_params) : +CSigningManager::CSigningManager(const CQuorumManager& _qman, const util::DbWrapperParams& db_params) : db{db_params}, - m_chainstate{chainstate}, qman{_qman} { } @@ -370,55 +360,24 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan) +void CSigningManager::VerifyAndProcessRecoveredSig(NodeId from, std::shared_ptr recoveredSig) { - retBan = false; - - auto llmqType = recoveredSig.getLlmqType(); - if (!Params().GetLLMQ(llmqType).has_value()) { - retBan = true; - return false; - } - - auto quorum = quorum_manager.GetQuorum(llmqType, recoveredSig.getQuorumHash()); + auto llmq_type = recoveredSig->getLlmqType(); + auto quorum = qman.GetQuorum(llmq_type, recoveredSig->getQuorumHash()); if (!quorum) { - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- quorum %s not found\n", __func__, - recoveredSig.getQuorumHash().ToString()); - return false; - } - if (!IsQuorumActive(llmqType, quorum_manager, quorum->qc->quorumHash)) { - return false; - } - - return true; -} - -MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) -{ - if (msg_type != NetMsgType::QSIGREC) { - return {}; + LogPrint(BCLog::LLMQ, "NetSigning::%s -- quorum %s not found\n", __func__, + recoveredSig->getQuorumHash().ToString()); + return; } - - auto recoveredSig = std::make_shared(); - vRecv >> *recoveredSig; - - MessageProcessingResult ret{}; - ret.m_to_erase = CInv{MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()}; - - bool ban = false; - if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { - if (ban) { - ret.m_error = MisbehavingError{100}; - return ret; - } - return ret; + if (!IsQuorumActive(llmq_type, qman, quorum->qc->quorumHash)) { + return; } // It's important to only skip seen *valid* sig shares here. See comment for CBatchedSigShare // We don't receive recovered sigs in batches, but we do batched verification per node on these if (db.HasRecoveredSigForHash(recoveredSig->GetHash())) { - return ret; + return; } LogPrint(BCLog::LLMQ, "CSigningManager::%s -- signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, @@ -429,17 +388,15 @@ MessageProcessingResult CSigningManager::ProcessMessage(NodeId from, std::string // no need to perform full verification LogPrint(BCLog::LLMQ, "CSigningManager::%s -- already pending reconstructed sig, signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, recoveredSig->buildSignHash().ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), from); - return ret; + return; } - pendingRecoveredSigs[from].emplace_back(recoveredSig); - return ret; + pendingRecoveredSigs[from].emplace_back(std::move(recoveredSig)); } bool CSigningManager::CollectPendingRecoveredSigsToVerify( - size_t maxUniqueSessions, - std::unordered_map>>& retSigShares, - std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) + size_t maxUniqueSessions, std::unordered_map>>& retSigShares, + std::unordered_map, CBLSPublicKey, StaticSaltedHasher>& ret_pubkeys) { bool more_work{false}; @@ -477,16 +434,13 @@ bool CSigningManager::CollectPendingRecoveredSigsToVerify( !pendingReconstructedRecoveredSigs.empty(); } - for (auto& p : retSigShares) { - NodeId nodeId = p.first; - auto& v = p.second; - + for (auto& [nodeId, v] : retSigShares) { for (auto it = v.begin(); it != v.end();) { const auto& recSig = *it; auto llmqType = recSig->getLlmqType(); auto quorumKey = std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash()); - if (!retQuorums.count(quorumKey)) { + if (!ret_pubkeys.count(quorumKey)) { auto quorum = qman.GetQuorum(llmqType, recSig->getQuorumHash()); if (!quorum) { LogPrint(BCLog::LLMQ, "CSigningManager::%s -- quorum %s not found, node=%d\n", __func__, @@ -501,7 +455,7 @@ bool CSigningManager::CollectPendingRecoveredSigsToVerify( continue; } - retQuorums.emplace(quorumKey, quorum); + ret_pubkeys.emplace(quorumKey, quorum->qc->quorumPublicKey); } ++it; @@ -511,88 +465,20 @@ bool CSigningManager::CollectPendingRecoveredSigsToVerify( return more_work; } -void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman) +Uint256HashMap> CSigningManager::FetchPendingReconstructed() { - decltype(pendingReconstructedRecoveredSigs) m; - WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs)); - - for (const auto& p : m) { - ProcessRecoveredSig(p.second, peerman); - } -} - -bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) -{ - std::unordered_map>> recSigsByNode; - std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - - ProcessPendingReconstructedRecoveredSigs(peerman); - - const size_t nMaxBatchSize{32}; - bool more_work = CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); - if (recSigsByNode.empty()) { - return false; - } - - // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not - // craftable by individual entities, making the rogue public key attack impossible - CBLSBatchVerifier batchVerifier(false, false); - - size_t verifyCount = 0; - for (const auto& p : recSigsByNode) { - NodeId nodeId = p.first; - const auto& v = p.second; - - for (const auto& recSig : v) { - // we didn't verify the lazy signature until now - if (!recSig->sig.Get().IsValid()) { - batchVerifier.badSources.emplace(nodeId); - break; - } - - const auto& quorum = quorums.at(std::make_pair(recSig->getLlmqType(), recSig->getQuorumHash())); - batchVerifier.PushMessage(nodeId, recSig->GetHash(), recSig->buildSignHash().Get(), recSig->sig.Get(), - quorum->qc->quorumPublicKey); - verifyCount++; - } - } - - cxxtimer::Timer verifyTimer(true); - batchVerifier.Verify(); - verifyTimer.stop(); - - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size()); - - Uint256HashSet processed; - for (const auto& p : recSigsByNode) { - NodeId nodeId = p.first; - const auto& v = p.second; - - if (batchVerifier.badSources.count(nodeId)) { - LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - peerman.Misbehaving(nodeId, 100); - continue; - } - - for (const auto& recSig : v) { - if (!processed.emplace(recSig->GetHash()).second) { - continue; - } - - ProcessRecoveredSig(recSig, peerman); - } - } - - return more_work; + Uint256HashMap> tmp; + WITH_LOCK(cs_pending, swap(tmp, pendingReconstructedRecoveredSigs)); + return tmp; } // signature must be verified already -void CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman) +bool CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig) { auto llmqType = recoveredSig->getLlmqType(); if (db.HasRecoveredSigForHash(recoveredSig->GetHash())) { - return; + return false; } auto signHash = recoveredSig->buildSignHash(); @@ -614,7 +500,7 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash())); - auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners); - for (auto& l : listeners) { - peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); - } + return true; +} - GetMainSignals().NotifyRecoveredSig(recoveredSig, recoveredSig->GetHash().ToString()); +std::vector CSigningManager::GetListeners() const +{ + LOCK(cs_listeners); + return recoveredSigsListeners; } void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr& recoveredSig) @@ -646,11 +533,6 @@ void CSigningManager::TruncateRecoveredSig(Consensus::LLMQType llmqType, const u void CSigningManager::Cleanup() { - constexpr auto CLEANUP_INTERVAL{5000ms}; - if (!cleanupThrottler.TryCleanup(CLEANUP_INTERVAL)) { - return; - } - int64_t maxAge = gArgs.GetIntArg("-maxrecsigsage", DEFAULT_MAX_RECOVERED_SIGS_AGE); db.CleanupOldRecoveredSigs(maxAge); @@ -714,47 +596,6 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256& return db.GetVoteForId(llmqType, id, msgHashRet); } -void CSigningManager::StartWorkerThread(PeerManager& peerman) -{ - // can't start new thread if we have one running already - if (workThread.joinable()) { - assert(false); - } - - workThread = std::thread(&util::TraceThread, "recsigs", [this, &peerman] { WorkThreadMain(peerman); }); -} - -void CSigningManager::StopWorkerThread() -{ - // make sure to call InterruptWorkerThread() first - if (!workInterrupt) { - assert(false); - } - - if (workThread.joinable()) { - workThread.join(); - } -} - -void CSigningManager::InterruptWorkerThread() -{ - workInterrupt(); -} - -void CSigningManager::WorkThreadMain(PeerManager& peerman) -{ - while (!workInterrupt) { - bool fMoreWork = ProcessPendingRecoveredSigs(peerman); - - Cleanup(); - - // TODO Wakeup when pending signing is needed? - if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { - return; - } - } -} - SignHash CSigBase::buildSignHash() const { return SignHash(llmqType, quorumHash, id, msgHash); } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index d4149314c0119..5ffee265fced4 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -7,23 +7,16 @@ #include #include -#include #include #include -#include - #include #include #include #include -#include -#include - -#include +#include #include #include -#include #include class CChainState; @@ -31,7 +24,6 @@ class CDataStream; class CDBBatch; class CDBWrapper; class CInv; -class PeerManager; struct RPCResult; namespace util { struct DbWrapperParams; @@ -42,6 +34,7 @@ class UniValue; namespace llmq { class CQuorumManager; class CSigSharesManager; +class SignHash; // Keep recovered signatures for a week. This is a "-maxrecsigsage" option default. static constexpr int64_t DEFAULT_MAX_RECOVERED_SIGS_AGE{60 * 60 * 24 * 7}; @@ -59,9 +52,7 @@ class CSigBase CSigBase() = default; public: - [[nodiscard]] constexpr auto getLlmqType() const { - return llmqType; - } + [[nodiscard]] constexpr Consensus::LLMQType getLlmqType() const { return llmqType; } [[nodiscard]] constexpr auto getQuorumHash() const -> const uint256& { return quorumHash; @@ -160,6 +151,7 @@ class CRecoveredSigsListener public: virtual ~CRecoveredSigsListener() = default; + // TODO: simplify returned type to std::variant [[nodiscard]] virtual MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) = 0; }; @@ -168,7 +160,6 @@ class CSigningManager private: CRecoveredSigsDb db; - const CChainState& m_chainstate; const CQuorumManager& qman; mutable Mutex cs_pending; @@ -178,8 +169,6 @@ class CSigningManager FastRandomContext rnd GUARDED_BY(cs_pending); - CleanupThrottler cleanupThrottler; - mutable Mutex cs_listeners; std::vector recoveredSigsListeners GUARDED_BY(cs_listeners); @@ -187,14 +176,13 @@ class CSigningManager CSigningManager() = delete; CSigningManager(const CSigningManager&) = delete; CSigningManager& operator=(const CSigningManager&) = delete; - explicit CSigningManager(const CChainState& chainstate, const CQuorumManager& _qman, - const util::DbWrapperParams& db_params); + explicit CSigningManager(const CQuorumManager& _qman, const util::DbWrapperParams& db_params); ~CSigningManager(); bool AlreadyHave(const CInv& inv) const EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - [[nodiscard]] MessageProcessingResult ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) + void VerifyAndProcessRecoveredSig(NodeId from, std::shared_ptr recovered_sig) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid @@ -208,20 +196,21 @@ class CSigningManager // DB. This allows AlreadyHave/late-share filtering to keep returning true. Cleanup will later remove the remains void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); -private: - bool CollectPendingRecoveredSigsToVerify( + // Used by NetSigning: + [[nodiscard]] Uint256HashMap> FetchPendingReconstructed() + EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); + [[nodiscard]] bool CollectPendingRecoveredSigsToVerify( size_t maxUniqueSessions, std::unordered_map>>& retSigShares, - std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) + std::unordered_map, CBLSPublicKey, StaticSaltedHasher>& ret_pubkeys) + EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); + [[nodiscard]] std::vector GetListeners() const EXCLUSIVE_LOCKS_REQUIRED(!cs_listeners); + // Returns true if recovered sigs should be send to listeners + [[nodiscard]] bool ProcessRecoveredSig(const std::shared_ptr& recoveredSig) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); - void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); - bool ProcessPendingRecoveredSigs(PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); // called from the worker thread of CSigSharesManager +private: // Used by CSigSharesManager CRecoveredSigsDb& GetDb() { return db; } - void ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); // Needed for access to GetDb() and ProcessRecoveredSig() friend class CSigSharesManager; @@ -239,16 +228,8 @@ class CSigningManager bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet) const; -private: - std::thread workThread; - CThreadInterrupt workInterrupt; - void Cleanup(); // called from the worker thread of CSigSharesManager - void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending, !cs_listeners); - public: - void StartWorkerThread(PeerManager& peerman); - void StopWorkerThread(); - void InterruptWorkerThread(); + void Cleanup(); }; template diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index ef899d1b11347..4d454afe5fa2c 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -789,35 +789,47 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CQuorum } if (canTryRecovery) { - TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); + auto rs = TryRecoverSig(*quorum, sigShare.getId(), sigShare.getMsgHash()); + if (rs != nullptr) { + if (sigman.ProcessRecoveredSig(rs)) { + // TODO: remove duplicated code with NetSigning + auto listeners = sigman.GetListeners(); + for (auto& l : listeners) { + m_peerman.PostProcessMessage(l->HandleNewRecoveredSig(*rs)); + } + + GetMainSignals().NotifyRecoveredSig(rs, rs->GetHash().ToString()); + } + } } } -void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, const uint256& msgHash) +std::shared_ptr CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, + const uint256& msgHash) { if (sigman.HasRecoveredSigForId(quorum.params.type, id)) { - return; + return nullptr; } std::vector sigSharesForRecovery; std::vector idsForRecovery; - std::shared_ptr singleMemberRecoveredSig; { LOCK(cs); auto signHash = SignHash(quorum.params.type, quorum.qc->quorumHash, id, msgHash).Get(); const auto* sigSharesForSignHash = sigShares.GetAllForSignHash(signHash); if (sigSharesForSignHash == nullptr) { - return; + return nullptr; } + std::shared_ptr singleMemberRecoveredSig; if (quorum.params.is_single_member()) { if (sigSharesForSignHash->empty()) { LogPrint(BCLog::LLMQ_SIGS, /* Continued */ "CSigSharesManager::%s -- impossible to recover single-node signature - no shares yet. id=%s, " "msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); - return; + return nullptr; } const auto& sigShare = sigSharesForSignHash->begin()->second; CBLSSignature recoveredSig = sigShare.sigShare.Get(); @@ -838,14 +850,11 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, // check if we can recover the final signature if (sigSharesForRecovery.size() < size_t(quorum.params.threshold)) { - return; + return nullptr; + } + if (quorum.params.is_single_member()) { + return singleMemberRecoveredSig; // end of single-quorum processing } - } - - // Handle single-member quorum case after releasing the lock - if (singleMemberRecoveredSig) { - sigman.ProcessRecoveredSig(singleMemberRecoveredSig, m_peerman); - return; // end of single-quorum processing } // now recover it @@ -854,7 +863,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, if (!recoveredSig.Recover(sigSharesForRecovery, idsForRecovery)) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- failed to recover signature. id=%s, msgHash=%s, time=%d\n", __func__, id.ToString(), msgHash.ToString(), t.count()); - return; + return nullptr; } LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- recovered signature. id=%s, msgHash=%s, time=%d\n", __func__, @@ -872,11 +881,10 @@ void CSigSharesManager::TryRecoverSig(const CQuorum& quorum, const uint256& id, // this should really not happen as we have verified all signature shares before LogPrintf("CSigSharesManager::%s -- own recovered signature is invalid. id=%s, msgHash=%s\n", __func__, id.ToString(), msgHash.ToString()); - return; + return nullptr; } } - - sigman.ProcessRecoveredSig(rs, m_peerman); + return rs; } CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorum& quorum, const uint256 &id, int attempt) diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 24865fec98a81..ff0c81dbf0cc0 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -478,7 +479,8 @@ class CSigSharesManager : public CRecoveredSigsListener EXCLUSIVE_LOCKS_REQUIRED(!cs); void ProcessSigShare(const CSigShare& sigShare, const CQuorumCPtr& quorum) EXCLUSIVE_LOCKS_REQUIRED(!cs); - void TryRecoverSig(const CQuorum& quorum, const uint256& id, const uint256& msgHash) EXCLUSIVE_LOCKS_REQUIRED(!cs); + std::shared_ptr TryRecoverSig(const CQuorum& quorum, const uint256& id, const uint256& msgHash) + EXCLUSIVE_LOCKS_REQUIRED(!cs); bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) EXCLUSIVE_LOCKS_REQUIRED(!cs); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9808fb9f646a7..9fbc81096950c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -657,6 +657,8 @@ class PeerManagerImpl final : public PeerManager void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerAskPeersForTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PeerPostProcessMessage(MessageProcessingResult&& ret) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + private: void _RelayTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); @@ -5434,7 +5436,6 @@ void PeerManagerImpl::ProcessMessage( PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); - PostProcessMessage(m_llmq_ctx->sigman->ProcessMessage(pfrom.GetId(), msg_type, vRecv), pfrom.GetId()); PostProcessMessage(ProcessPlatformBanMessage(pfrom.GetId(), msg_type, vRecv), pfrom.GetId()); if (msg_type == NetMsgType::CLSIG) { @@ -6560,3 +6561,8 @@ void PeerManagerImpl::PeerAskPeersForTransaction(const uint256& txid) { AskPeersForTransaction(txid); } + +void PeerManagerImpl::PeerPostProcessMessage(MessageProcessingResult&& ret) +{ + PostProcessMessage(std::move(ret), -1); +} diff --git a/src/net_processing.h b/src/net_processing.h index d15298401c246..de73b013dd8ed 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -64,6 +64,7 @@ class PeerManagerInternal virtual void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) = 0; virtual void PeerAskPeersForTransaction(const uint256& txid) = 0; + virtual void PeerPostProcessMessage(MessageProcessingResult&& ret) = 0; }; class NetHandler diff --git a/src/test/evo_islock_tests.cpp b/src/test/evo_islock_tests.cpp index 7dc76475941dc..4d40b139af58c 100644 --- a/src/test/evo_islock_tests.cpp +++ b/src/test/evo_islock_tests.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index c765c656ad765..0049ffc3d353e 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -4,12 +4,9 @@ #include #include -#include -#include #include #include