diff --git a/cabal.project b/cabal.project index b983c81b56..82cbf035ba 100644 --- a/cabal.project +++ b/cabal.project @@ -80,12 +80,19 @@ source-repository-package eras/byron/ledger/impl eras/byron/crypto --- Backported version of https://github.com/IntersectMBO/ouroboros-network/pull/5161 +allow-newer: + -- https://github.com/phadej/vec/issues/121 + , ral:QuickCheck + , fin:QuickCheck + , bin:QuickCheck + +-- Using https://github.com/IntersectMBO/ouroboros-network/tree/peras-staging/pr-5202-v2 source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: 1385b53cefb81e79553b6b0252537455833ea9c4 - --sha256: sha256-zZ7WsMfRs1fG16bmvI5vIh4fhQ8RGyEvYGLSWlrxpg0= + tag: 0db8669b67982cba755e80bf2e413527def41244 + --sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ= subdir: + ouroboros-network-protocols ouroboros-network-api ouroboros-network diff --git a/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..9efe00d438 --- /dev/null +++ b/ouroboros-consensus-cardano/changelog.d/20250919_095938_thomas.bagrel_object_diffusion.md @@ -0,0 +1,23 @@ + + + + + +### Breaking + +- Added support for `NodeToNodeV_16` diff --git a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs index 52a7e4f910..98093f86c5 100644 --- a/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs +++ b/ouroboros-consensus-cardano/src/ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Node.hs @@ -430,6 +430,7 @@ instance Map.fromList $ [ (NodeToNodeV_14, CardanoNodeToNodeVersion2) , (NodeToNodeV_15, CardanoNodeToNodeVersion2) + , (NodeToNodeV_16, CardanoNodeToNodeVersion2) ] supportedNodeToClientVersions _ = diff --git a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs index c03e0e5179..7003a5ce8a 100644 --- a/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs +++ b/ouroboros-consensus-cardano/src/shelley/Ouroboros/Consensus/Shelley/Ledger/NetworkProtocolVersion.hs @@ -48,6 +48,7 @@ instance SupportedNetworkProtocolVersion (ShelleyBlock proto era) where Map.fromList [ (NodeToNodeV_14, ShelleyNodeToNodeVersion1) , (NodeToNodeV_15, ShelleyNodeToNodeVersion1) + , (NodeToNodeV_16, ShelleyNodeToNodeVersion1) ] supportedNodeToClientVersions _ = Map.fromList diff --git a/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md b/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..8ba76bb8ff --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20250919_095930_thomas.bagrel_object_diffusion.md @@ -0,0 +1,27 @@ + + + + + +### Breaking + +- Modify `Ouroboros.Consensus{.Node,.Node.Tracer,.Network.NodeToNode}` to wire-in PerasCertDiffusion similarly to other mini-protocols (e.g. TX-submission) + +### Non-Breaking + +- Update `Test.ThreadNet.Network` in `unstable-diffusion-testlib` accordingly to the changes made in `Ouroboros.Consensus.Network.NodeToNode` diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index bc66cf78d6..294aace61f 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -54,6 +54,7 @@ import qualified Data.ByteString.Lazy as BSL import Data.Hashable (Hashable) import Data.Int (Int64) import Data.Map.Strict (Map) +import qualified Data.Set as Set import Data.Void (Void) import qualified Network.Mux as Mux import Network.TypedProtocol.Codec @@ -68,6 +69,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client ) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient import Ouroboros.Consensus.MiniProtocol.ChainSync.Server +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.ExitPolicy import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Node.Run @@ -81,10 +86,6 @@ import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Network.Block ( Serialised (..) - , decodePoint - , decodeTip - , encodePoint - , encodeTip ) import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client @@ -124,6 +125,18 @@ import Ouroboros.Network.Protocol.KeepAlive.Client import Ouroboros.Network.Protocol.KeepAlive.Codec import Ouroboros.Network.Protocol.KeepAlive.Server import Ouroboros.Network.Protocol.KeepAlive.Type +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec + ( byteLimitsObjectDiffusion + , codecObjectDiffusion + , codecObjectDiffusionId + , timeLimitsObjectDiffusion + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + ( objectDiffusionOutboundPeer + ) import Ouroboros.Network.Protocol.PeerSharing.Client ( PeerSharingClient , peerSharingClientPeer @@ -197,6 +210,15 @@ data Handlers m addr blk = Handlers NodeToNodeVersion -> ConnectionId addr -> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m () + , hPerasCertDiffusionClient :: + NodeToNodeVersion -> + ControlMessageSTM m -> + ConnectionId addr -> + PerasCertDiffusionInboundPipelined blk m () + , hPerasCertDiffusionServer :: + NodeToNodeVersion -> + ConnectionId addr -> + PerasCertDiffusionOutbound blk m () , hKeepAliveClient :: NodeToNodeVersion -> ControlMessageSTM m -> @@ -293,6 +315,22 @@ mkHandlers (mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool) (getMempoolWriter getMempool) version + , hPerasCertDiffusionClient = \version controlMessageSTM peer -> + objectDiffusionInbound + (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers)) + ( perasCertDiffusionMaxFifoLength miniProtocolParameters + , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 + , 10 -- TODO: see https://github.com/tweag/cardano-peras/issues/97 + ) + (makePerasCertPoolWriterFromChainDB $ getChainDB) + version + controlMessageSTM + , hPerasCertDiffusionServer = \version peer -> + objectDiffusionOutbound + (contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers)) + (perasCertDiffusionMaxFifoLength miniProtocolParameters) + (makePerasCertPoolReaderFromChainDB $ getChainDB) + version , hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng , hKeepAliveServer = \_version _peer -> keepAliveServer , hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM @@ -304,7 +342,7 @@ mkHandlers -------------------------------------------------------------------------------} -- | Node-to-node protocol codecs needed to run 'Handlers'. -data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs +data Codecs blk addr e m bCS bSCS bBF bSBF bTX bPCD bKA bPS = Codecs { cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS , cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS @@ -312,6 +350,7 @@ data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs , cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF , cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX + , cPerasCertDiffusionCodec :: Codec (PerasCertDiffusion blk) e m bPCD , cKeepAliveCodec :: Codec KeepAlive e m bKA , cPeerSharingCodec :: Codec (PeerSharing addr) e m bPS } @@ -339,49 +378,53 @@ defaultCodecs :: ByteString ByteString ByteString + ByteString defaultCodecs ccfg version encAddr decAddr nodeToNodeVersion = Codecs { cChainSyncCodec = codecChainSync enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) - (encodeTip (encodeRawHash p)) - (decodeTip (decodeRawHash p)) + enc + dec + enc + dec , cChainSyncCodecSerialised = codecChainSync enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) - (encodeTip (encodeRawHash p)) - (decodeTip (decodeRawHash p)) + enc + dec + enc + dec , cBlockFetchCodec = codecBlockFetch enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) + enc + dec , cBlockFetchCodecSerialised = codecBlockFetch enc dec - (encodePoint (encodeRawHash p)) - (decodePoint (decodeRawHash p)) + enc + dec , cTxSubmission2Codec = codecTxSubmission2 enc dec enc dec + , cPerasCertDiffusionCodec = + codecObjectDiffusion + enc + dec + enc + dec , cKeepAliveCodec = codecKeepAlive_v2 , cPeerSharingCodec = codecPeerSharing (encAddr nodeToNodeVersion) (decAddr nodeToNodeVersion) } where - p :: Proxy blk - p = Proxy - enc :: SerialiseNodeToNode blk a => a -> Encoding enc = encodeNodeToNode ccfg version @@ -401,6 +444,7 @@ identityCodecs :: (AnyMessage (BlockFetch blk (Point blk))) (AnyMessage (BlockFetch (Serialised blk) (Point blk))) (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) identityCodecs = @@ -410,6 +454,7 @@ identityCodecs = , cBlockFetchCodec = codecBlockFetchId , cBlockFetchCodecSerialised = codecBlockFetchId , cTxSubmission2Codec = codecTxSubmission2Id + , cPerasCertDiffusionCodec = codecObjectDiffusionId , cKeepAliveCodec = codecKeepAliveId , cPeerSharingCodec = codecPeerSharingId } @@ -432,6 +477,7 @@ data Tracers' peer ntnAddr blk e f = Tracers f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk)))) , tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk)))) + , tPerasCertDiffusionTracer :: f (TraceLabelPeer peer (TraceSendRecv (PerasCertDiffusion blk))) , tKeepAliveTracer :: f (TraceLabelPeer peer (TraceSendRecv KeepAlive)) , tPeerSharingTracer :: f (TraceLabelPeer peer (TraceSendRecv (PeerSharing ntnAddr))) } @@ -444,6 +490,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer ntnAddr blk e f , tBlockFetchTracer = f tBlockFetchTracer , tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer , tTxSubmission2Tracer = f tTxSubmission2Tracer + , tPerasCertDiffusionTracer = f tPerasCertDiffusionTracer , tKeepAliveTracer = f tKeepAliveTracer , tPeerSharingTracer = f tPeerSharingTracer } @@ -464,6 +511,7 @@ nullTracers = , tBlockFetchTracer = nullTracer , tBlockFetchSerialisedTracer = nullTracer , tTxSubmission2Tracer = nullTracer + , tPerasCertDiffusionTracer = nullTracer , tKeepAliveTracer = nullTracer , tPeerSharingTracer = nullTracer } @@ -485,6 +533,7 @@ showTracers tr = , tBlockFetchTracer = showTracing tr , tBlockFetchSerialisedTracer = showTracing tr , tTxSubmission2Tracer = showTracing tr + , tPerasCertDiffusionTracer = showTracing tr , tKeepAliveTracer = showTracing tr , tPeerSharingTracer = showTracing tr } @@ -509,7 +558,7 @@ type ServerApp m addr bytes a = -- | Applications for the node-to-node protocols -- -- See 'Network.Mux.Types.MuxApplication' -data Apps m addr bCS bBF bTX bKA bPS a b = Apps +data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps { aChainSyncClient :: ClientApp m addr bCS a -- ^ Start a chain sync client that communicates with the given upstream -- node. @@ -525,6 +574,10 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps -- given upstream node. , aTxSubmission2Server :: ServerApp m addr bTX b -- ^ Start a transaction submission v2 server. + , aPerasCertDiffusionClient :: ClientApp m addr bPCD a + -- ^ Start a Peras cert diffusion client. + , aPerasCertDiffusionServer :: ServerApp m addr bPCD b + -- ^ Start a Peras cert diffusion server. , aKeepAliveClient :: ClientApp m addr bKA a -- ^ Start a keep-alive client. , aKeepAliveServer :: ServerApp m addr bKA b @@ -540,7 +593,7 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps -- -- They don't depend on the instantiation of the protocol parameters (which -- block type is used, etc.), hence the use of 'RankNTypes'. -data ByteLimits bCS bBF bTX bKA bPS = ByteLimits +data ByteLimits bCS bBF bTX bPCD bKA bPS = ByteLimits { blChainSync :: forall header point tip. ProtocolSizeLimits @@ -556,6 +609,11 @@ data ByteLimits bCS bBF bTX bKA bPS = ByteLimits ProtocolSizeLimits (TxSubmission2 txid tx) bTX + , blPerasCertDiffusion :: + forall blk. + ProtocolSizeLimits + (PerasCertDiffusion blk) + bPCD , blKeepAlive :: ProtocolSizeLimits KeepAlive @@ -567,22 +625,24 @@ data ByteLimits bCS bBF bTX bKA bPS = ByteLimits bPS } -noByteLimits :: ByteLimits bCS bBF bTX bKA bPS +noByteLimits :: ByteLimits bCS bBF bTX bPCD bKA bPS noByteLimits = ByteLimits { blChainSync = byteLimitsChainSync (const 0) , blBlockFetch = byteLimitsBlockFetch (const 0) , blTxSubmission2 = byteLimitsTxSubmission2 (const 0) + , blPerasCertDiffusion = byteLimitsObjectDiffusion (const 0) , blKeepAlive = byteLimitsKeepAlive (const 0) , blPeerSharing = byteLimitsPeerSharing (const 0) } -byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString +byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString ByteString byteLimits = ByteLimits { blChainSync = byteLimitsChainSync size , blBlockFetch = byteLimitsBlockFetch size , blTxSubmission2 = byteLimitsTxSubmission2 size + , blPerasCertDiffusion = byteLimitsObjectDiffusion size , blKeepAlive = byteLimitsKeepAlive size , blPeerSharing = byteLimitsPeerSharing size } @@ -594,7 +654,7 @@ byteLimits = -- | Construct the 'NetworkApplication' for the node-to-node protocols mkApps :: - forall m addrNTN addrNTC blk e bCS bBF bTX bKA bPS. + forall m addrNTN addrNTC blk e bCS bBF bTX bPCD bKA bPS. ( IOLike m , MonadTimer m , Ord addrNTN @@ -609,8 +669,8 @@ mkApps :: NodeKernel m addrNTN addrNTC blk -> StdGen -> Tracers m addrNTN blk e -> - (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bKA bPS) -> - ByteLimits bCS bBF bTX bKA bPS -> + (NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bPCD bKA bPS) -> + ByteLimits bCS bBF bTX bPCD bKA bPS -> -- Chain-Sync timeouts for chain-sync client (using `Header blk`) as well as -- the server (`SerialisedHeader blk`). (forall header. ProtocolTimeLimitsWithRnd (ChainSync header (Point blk) (Tip blk))) -> @@ -618,7 +678,7 @@ mkApps :: CsClient.CSJConfig -> ReportPeerMetrics m (ConnectionId addrNTN) -> Handlers m addrNTN blk -> - Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult () + Apps m addrNTN bCS bBF bTX bPCD bKA bPS NodeToNodeInitiatorResult () mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucketConfig csjConfig ReportPeerMetrics{..} Handlers{..} = Apps{..} where @@ -797,6 +857,51 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke channel (txSubmissionServerPeerPipelined (hTxSubmissionServer version them)) + aPerasCertDiffusionClient :: + NodeToNodeVersion -> + ExpandedInitiatorContext addrNTN m -> + Channel m bPCD -> + m (NodeToNodeInitiatorResult, Maybe bPCD) + aPerasCertDiffusionClient + version + ExpandedInitiatorContext + { eicConnectionId = them + , eicControlMessage = controlMessageSTM + } + channel = do + labelThisThread "PerasCertDiffusionClient" + ((), trailing) <- + runPipelinedPeerWithLimits + (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) + (cPerasCertDiffusionCodec (mkCodecs version)) + blPerasCertDiffusion + timeLimitsObjectDiffusion + channel + ( objectDiffusionInboundPeerPipelined + (hPerasCertDiffusionClient version controlMessageSTM them) + ) + return (NoInitiatorResult, trailing) + + aPerasCertDiffusionServer :: + NodeToNodeVersion -> + ResponderContext addrNTN -> + Channel m bPCD -> + m ((), Maybe bPCD) + aPerasCertDiffusionServer + version + ResponderContext{rcConnectionId = them} + channel = do + labelThisThread "PerasCertDiffusionServer" + runPeerWithLimits + (TraceLabelPeer them `contramap` tPerasCertDiffusionTracer) + (cPerasCertDiffusionCodec (mkCodecs version)) + blPerasCertDiffusion + timeLimitsObjectDiffusion + channel + ( objectDiffusionOutboundPeer + (hPerasCertDiffusionServer version them) + ) + aKeepAliveClient :: NodeToNodeVersion -> ExpandedInitiatorContext addrNTN m -> @@ -900,10 +1005,11 @@ initiator :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b a c -> + Apps m addr b b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorMode addr b m a Void initiator miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols + Set.empty -- TODO: change for a meaningful value miniProtocolParameters -- TODO: currently consensus is using 'ConnectionId' for its 'peer' type. -- This is currently ok, as we might accept multiple connections from the @@ -918,6 +1024,9 @@ initiator miniProtocolParameters version versionData Apps{..} = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aBlockFetchClient version ctx))) , txSubmissionProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aTxSubmission2Client version ctx))) + , perasCertDiffusionProtocol = + (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aPerasCertDiffusionClient version ctx))) + , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = (InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aKeepAliveClient version ctx))) , peerSharingProtocol = @@ -936,10 +1045,11 @@ initiatorAndResponder :: MiniProtocolParameters -> NodeToNodeVersion -> NodeToNodeVersionData -> - Apps m addr b b b b b a c -> + Apps m addr b b b b b b a c -> OuroborosBundleWithExpandedCtx 'Mux.InitiatorResponderMode addr b m a c initiatorAndResponder miniProtocolParameters version versionData Apps{..} = nodeToNodeProtocols + Set.empty -- TODO: change for a meaningful value miniProtocolParameters ( NodeToNodeProtocols { chainSyncProtocol = @@ -957,6 +1067,12 @@ initiatorAndResponder miniProtocolParameters version versionData Apps{..} = (MiniProtocolCb (\initiatorCtx -> aTxSubmission2Client version initiatorCtx)) (MiniProtocolCb (\responderCtx -> aTxSubmission2Server version responderCtx)) ) + , perasCertDiffusionProtocol = + ( InitiatorAndResponderProtocol + (MiniProtocolCb (\initiatorCtx -> aPerasCertDiffusionClient version initiatorCtx)) + (MiniProtocolCb (\responderCtx -> aPerasCertDiffusionServer version responderCtx)) + ) + , perasVoteDiffusionProtocol = error "perasVoteDiffusionProtocol not implemented" , keepAliveProtocol = ( InitiatorAndResponderProtocol (MiniProtocolCb (\initiatorCtx -> aKeepAliveClient version initiatorCtx)) diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index 44123d6f3a..ac9fb75450 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -655,6 +655,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString + ByteString NodeToNodeInitiatorResult () mkNodeToNodeApps nodeKernelArgs nodeKernel peerMetrics encAddrNTN decAddrNTN version = @@ -696,6 +697,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = ByteString ByteString ByteString + ByteString NodeToNodeInitiatorResult () ) -> diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs index 3d025ea91d..4b8627dbd9 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs @@ -42,6 +42,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Server import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server ( TraceLocalTxSubmissionServerEvent (..) ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert import Ouroboros.Consensus.Node.GSM (TraceGsmEvent) import Ouroboros.Consensus.Protocol.Praos.AgentClient ( KESAgentClientTrace (..) @@ -79,6 +80,10 @@ data Tracers' remotePeer localPeer blk f = Tracers f (TraceLabelPeer remotePeer (TraceTxSubmissionOutbound (GenTxId blk) (GenTx blk))) , localTxSubmissionServerTracer :: f (TraceLocalTxSubmissionServerEvent blk) , mempoolTracer :: f (TraceEventMempool blk) + , perasCertDiffusionInboundTracer :: + f (TraceLabelPeer remotePeer (TracePerasCertDiffusionInbound blk)) + , perasCertDiffusionOutboundTracer :: + f (TraceLabelPeer remotePeer (TracePerasCertDiffusionOutbound blk)) , forgeTracer :: f (TraceLabelCreds (TraceForgeEvent blk)) , blockchainTimeTracer :: f (TraceBlockchainTimeEvent UTCTime) , forgeStateInfoTracer :: f (TraceLabelCreds (ForgeStateInfo blk)) @@ -109,6 +114,8 @@ instance , txOutboundTracer = f txOutboundTracer , localTxSubmissionServerTracer = f localTxSubmissionServerTracer , mempoolTracer = f mempoolTracer + , perasCertDiffusionInboundTracer = f perasCertDiffusionInboundTracer + , perasCertDiffusionOutboundTracer = f perasCertDiffusionOutboundTracer , forgeTracer = f forgeTracer , blockchainTimeTracer = f blockchainTimeTracer , forgeStateInfoTracer = f forgeStateInfoTracer @@ -146,6 +153,8 @@ nullTracers = , txOutboundTracer = nullTracer , localTxSubmissionServerTracer = nullTracer , mempoolTracer = nullTracer + , perasCertDiffusionInboundTracer = nullTracer + , perasCertDiffusionOutboundTracer = nullTracer , forgeTracer = nullTracer , blockchainTimeTracer = nullTracer , forgeStateInfoTracer = nullTracer @@ -185,6 +194,8 @@ showTracers tr = , txOutboundTracer = showTracing tr , localTxSubmissionServerTracer = showTracing tr , mempoolTracer = showTracing tr + , perasCertDiffusionInboundTracer = showTracing tr + , perasCertDiffusionOutboundTracer = showTracing tr , forgeTracer = showTracing tr , blockchainTimeTracer = showTracing tr , forgeStateInfoTracer = showTracing tr diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 8a58fe3b03..f5e9dbecbd 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -83,6 +83,7 @@ import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCheck as HistoricityCheck import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert (PerasCertDiffusion) import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy import qualified Ouroboros.Consensus.Node.GSM as GSM @@ -123,8 +124,8 @@ import Ouroboros.Network.NodeToNode ( ConnectionId (..) , ExpandedInitiatorContext (..) , IsBigLedgerPeer (..) - , MiniProtocolParameters (..) , ResponderContext (..) + , defaultMiniProtocolParameters ) import Ouroboros.Network.PeerSelection.Governor ( makePublicPeerSelectionStateVar @@ -1056,13 +1057,7 @@ runThreadNetwork , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , keepAliveRng = kaRng , peerSharingRng = psRng - , miniProtocolParameters = - MiniProtocolParameters - { chainSyncPipeliningHighMark = 4 - , chainSyncPipeliningLowMark = 2 - , blockFetchPipeliningMax = 10 - , txSubmissionMaxUnacked = 1000 -- TODO ? - } + , miniProtocolParameters = defaultMiniProtocolParameters , blockFetchConfiguration = BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1 @@ -1188,6 +1183,7 @@ runThreadNetwork Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing NodeId)) customNodeToNodeCodecs cfg ntnVersion = @@ -1207,6 +1203,9 @@ runThreadNetwork , cTxSubmission2Codec = mapFailureCodec CodecIdFailure $ NTN.cTxSubmission2Codec NTN.identityCodecs + , cPerasCertDiffusionCodec = + mapFailureCodec CodecIdFailure $ + NTN.cPerasCertDiffusionCodec NTN.identityCodecs , cKeepAliveCodec = mapFailureCodec CodecIdFailure $ NTN.cKeepAliveCodec NTN.identityCodecs @@ -1797,6 +1796,7 @@ type LimitedApp' m addr blk = Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) + (AnyMessage (PerasCertDiffusion blk)) (AnyMessage KeepAlive) (AnyMessage (PeerSharing addr)) NodeToNodeInitiatorResult diff --git a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs index a455689110..34d0c567a9 100644 --- a/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs +++ b/ouroboros-consensus-diffusion/test/mock-test/Test/ThreadNet/BFT.hs @@ -103,6 +103,7 @@ prop_simple_bft_convergence , version = newestVersion (Proxy @MockBftBlock) } + testOutput :: TestOutput MockBftBlock testOutput = runTestNetwork testConfig diff --git a/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md b/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md new file mode 100644 index 0000000000..6d34c05836 --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250918_104810_thomas.bagrel_object_diffusion.md @@ -0,0 +1,28 @@ + + + + + +### Breaking + +- Rely on a new version of `ouroboros-network` with support for ObjectDiffusion mini-protocol +- Add modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion{.Inbound,.Outbound}` with implementations of the ObjectDiffusion protocol (quite similar/inspired from TX-submission, except that client = inbound, server = outbound) +- Add module `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API` defining `ObjectPool{Reader,Writer}` interfaces, through which ObjectDiffusion accesses/stores the objects to send/that have been received. +- Add modules `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert` and `Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert` containing definitions specific to `PerasCert` diffusion through the ObjectDiffusion mini-protocol +- Modify `Ouroboros.Consensus.Node.Serialisation` to add CBOR serialisation (`SerialiseNodeToNode`) for `Point blk`, `Tip blk`, and `PerasCert blk` +- Add modules `Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke` and `Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke` with smoke tests for the general ObjectDiffusion mini-protocol and for the `PerasCert`-specific instance of it diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 50da4ba99b..a0d3a83786 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -191,6 +191,11 @@ library Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound + Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert Ouroboros.Consensus.Node.GsmState Ouroboros.Consensus.Node.InitStorage Ouroboros.Consensus.Node.NetworkProtocolVersion @@ -664,6 +669,8 @@ test-suite consensus-test Test.Consensus.MiniProtocol.ChainSync.CSJ Test.Consensus.MiniProtocol.ChainSync.Client Test.Consensus.MiniProtocol.LocalStateQuery.Server + Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke + Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke Test.Consensus.Peras.WeightSnapshot Test.Consensus.Util.MonadSTM.NormalForm Test.Consensus.Util.Versioned diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs new file mode 100644 index 0000000000..4208398f24 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs @@ -0,0 +1,492 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +-- | Defines the inbound side of the ObjectDiffusion mini-protocol. +-- This protocol is inspired from TxSubmission and aims to provide a generic +-- way to diffuse objects for Peras, i.e. Peras votes and certificates. +-- +-- See the design document here: https://tweag.github.io/cardano-peras/peras-design.pdf +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + ( objectDiffusionInbound + , TraceObjectDiffusionInbound (..) + , ObjectDiffusionInboundError (..) + , NumObjectsProcessed (..) + ) where + +import Cardano.Prelude (catMaybes, (&)) +import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked +import Control.Exception (assert) +import Control.Monad (when) +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) +import Data.Data (Typeable) +import Data.Foldable as Foldable (foldl', toList) +import Data.List qualified as List +import Data.List.NonEmpty qualified as NonEmpty +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.Sequence.Strict (StrictSeq) +import Data.Sequence.Strict qualified as Seq +import Data.Set (Set) +import Data.Set qualified as Set +import Data.Word (Word64) +import GHC.Generics (Generic) +import Network.TypedProtocol.Core (N (Z), Nat (..), natToInt) +import NoThunks.Class (NoThunks (..)) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.Util.NormalForm.Invariant (noThunksInvariant) +import Ouroboros.Network.ControlMessage +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +newtype NumObjectsProcessed + = NumObjectsProcessed + { getNumObjectsProcessed :: Word64 + } + deriving (Eq, Show) + +data TraceObjectDiffusionInbound objectId object + = -- | Number of objects just about to be inserted. + TraceObjectDiffusionInboundCollectedObjects Int + | -- | Just processed object pass/fail breakdown. + TraceObjectDiffusionInboundAddedObjects NumObjectsProcessed + | -- | Received a 'ControlMessage' from the outbound peer governor, and about + -- to act on it. + TraceObjectDiffusionInboundRecvControlMessage ControlMessage + | TraceObjectDiffusionInboundCanRequestMoreObjects Int + | TraceObjectDiffusionInboundCannotRequestMoreObjects Int + deriving (Eq, Show) + +data ObjectDiffusionInboundError objectId object + = ProtocolErrorObjectsDifferentThanRequested (Set objectId) (Set objectId) + | ProtocolErrorObjectIdsNotRequested Int Int + | ProtocolErrorObjectIdsAlreadyKnown (Set objectId) + | ProtocolErrorObjectIdsDuplicate (Map objectId Int) + deriving Show + +instance + (Show objectId, Typeable object, Typeable objectId) => + Exception (ObjectDiffusionInboundError objectId object) + where + displayException (ProtocolErrorObjectsDifferentThanRequested reqButNotRecvd recvButNotReq) = + "The peer replied with different objects than those we asked for: " + ++ "requested but didn't receive " + ++ show reqButNotRecvd + ++ "; received but didn't requested " + ++ show recvButNotReq + displayException (ProtocolErrorObjectIdsNotRequested expected actual) = + "The peer replied with more objectIds than we asked for: expected " + ++ show expected + ++ " , received " + ++ show actual + displayException (ProtocolErrorObjectIdsAlreadyKnown offenders) = + "The peer replied with some objectIds that it has already sent us previously: " + ++ show offenders + displayException (ProtocolErrorObjectIdsDuplicate offenders) = + "The peer replied with a batch of objectIds containing duplicates: " + ++ show offenders + +-- | Information maintained internally in the 'objectDiffusionInbound' +-- implementation. +data InboundSt objectId object = InboundSt + { numIdsInFlight :: !NumObjectIdsReq + -- ^ The number of object identifiers that we have requested but + -- which have not yet been replied to. We need to track this to keep + -- our requests within the limit on the 'outstandingFifo' size. + , outstandingFifo :: !(StrictSeq objectId) + -- ^ This mirrors the queue of objects that the outbound peer has available + -- for us. Objects are kept in the order in which the outbound peer + -- advertised them to us. This is the same order in which we submit them to + -- the objectPool. It is also the order we acknowledge them. + , canRequestNext :: !(Set objectId) + -- ^ The objectIds that we can request. These are a subset of the + -- 'outstandingFifo' that we have not yet requested or not have in the pool + -- already. This is not ordered to illustrate the fact that we can + -- request objects out of order. + , pendingObjects :: !(Map objectId (Maybe object)) + -- ^ Objects we have successfully downloaded (or decided intentionally to + -- skip download) but have not yet added to the objectPool or acknowledged. + -- + -- Object IDs in this 'Map' are mapped to 'Nothing' if we notice that + -- they are already in the objectPool. That way we can skip requesting them + -- from the outbound peer, but still acknowledge them when the time comes. + , numToAckOnNextReq :: !NumObjectIdsAck + -- ^ The number of objects we can acknowledge on our next request + -- for more object IDs. Their corresponding IDs have already been removed + -- from 'outstandingFifo'. + } + deriving stock (Show, Generic) + deriving anyclass NoThunks + +initialInboundSt :: InboundSt objectId object +initialInboundSt = InboundSt 0 Seq.empty Set.empty Map.empty 0 + +objectDiffusionInbound :: + forall objectId object m. + ( Ord objectId + , Show objectId + , Typeable objectId + , Typeable object + , NoThunks objectId + , NoThunks object + , MonadSTM m + , MonadThrow m + ) => + Tracer m (TraceObjectDiffusionInbound objectId object) -> + -- | Maximum values for outstanding FIFO length, number of IDs to request, + -- and number of objects to request + (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) -> + ObjectPoolWriter objectId object m -> + NodeToNodeVersion -> + ControlMessageSTM m -> + ObjectDiffusionInboundPipelined objectId object m () +objectDiffusionInbound + tracer + (maxFifoLength, maxNumIdsToReq, maxNumObjectsToReq) + ObjectPoolWriter{..} + _version + controlMessageSTM = + ObjectDiffusionInboundPipelined $! + checkState initialInboundSt & go Zero + where + canRequestMoreObjects :: InboundSt k object -> Bool + canRequestMoreObjects !st = + not (Set.null (canRequestNext st)) + + -- Computes how many new IDs we can request so that receiving all of them + -- won't make 'outstandingFifo' exceed 'maxFifoLength'. + numIdsToReq :: InboundSt objectId object -> NumObjectIdsReq + numIdsToReq !st = + maxNumIdsToReq + `min` ( fromIntegral maxFifoLength + - (fromIntegral $ Seq.length $ outstandingFifo st) + - numIdsInFlight st + ) + + -- Updates 'InboundSt' with new object IDs and return the updated 'InboundSt'. + -- + -- Collected object IDs that are already in the objectPool are pre-emptively + -- acknowledged so that we don't need to bother requesting them from the + -- outbound peer. + preAcknowledge :: + InboundSt objectId object -> + (objectId -> Bool) -> + [objectId] -> + InboundSt objectId object + preAcknowledge !st _ collectedIds | null collectedIds = st + preAcknowledge !st poolHasObject collectedIds = + let + -- Divide the collected IDs in two parts: those that are already in the + -- objectPool and those that are not. + (alreadyObtained, notYetObtained) = + List.partition + poolHasObject + collectedIds + + -- The objects that we intentionally don't request, because they are + -- already in the objectPool, will need to be acknowledged. + -- So we extend 'pendingObjects' with those objects (so of course they + -- have no corresponding reply). + pendingObjects' = + foldl' + (\accMap objectId -> Map.insert objectId Nothing accMap) + (pendingObjects st) + alreadyObtained + + -- We initially extend 'outstandingFifo' with the all the collected IDs + -- (to properly mirror the server state). + outstandingFifo' = outstandingFifo st <> Seq.fromList collectedIds + + -- Now check if the update of 'pendingObjects' let us acknowledge a prefix + -- of the 'outstandingFifo', as we do in 'goCollect' -> 'CollectObjects'. + (objectIdsToAck, outstandingFifo'') = + Seq.spanl (`Map.member` pendingObjects') outstandingFifo' + + -- If so we can remove them from the 'pendingObjects' structure. + -- + -- Note that unlike in TX-Submission, we made sure the outstanding FIFO + -- couldn't have duplicate IDs, so we don't have to worry about re-adding + -- the duplicate IDs to 'pendingObjects' for future acknowledgment. + pendingObjects'' = + Foldable.foldl' + (flip Map.delete) + pendingObjects' + objectIdsToAck + + !st' = + st + { canRequestNext = canRequestNext st <> (Set.fromList notYetObtained) + , pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo'' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } + in + st' + + go :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + go n !st = WithEffect $ do + -- Check whether we should continue engaging in the protocol. + ctrlMsg <- atomically controlMessageSTM + traceWith tracer $ + TraceObjectDiffusionInboundRecvControlMessage ctrlMsg + case ctrlMsg of + -- The peer selection governor is asking us to terminate the connection. + Terminate -> + pure $! terminateAfterDrain n + -- Otherwise, we can continue the protocol normally. + _continue -> case n of + -- We didn't pipeline any requests, so there are no replies in flight + -- (nothing to collect) + Zero -> do + if canRequestMoreObjects st + then do + -- There are no replies in flight, but we do know some more objects + -- we can ask for, so lets ask for them and more objectIds in a + -- pipelined way. + traceWith tracer $ + TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) + pure $! checkState st & goReqObjectsAndObjectIdsPipelined Zero + else do + -- There's no replies in flight, and we have no more objects we can + -- ask for so the only remaining thing to do is to ask for more + -- objectIds. Since this is the only thing to do now, we make this a + -- blocking call. + traceWith tracer $ + TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) + pure $! checkState st & goReqObjectIdsBlocking + + -- We have pipelined some requests, so there are some replies in flight. + Succ n' -> + if canRequestMoreObjects st + then do + -- We have replies in flight and we should eagerly collect them if + -- available, but there are objects to request too so we + -- should *not* block waiting for replies. + -- So we ask for new objects and objectIds in a pipelined way. + traceWith tracer $ + TraceObjectDiffusionInboundCanRequestMoreObjects (natToInt n) + pure $! + CollectPipelined + (Just (checkState st & goReqObjectsAndObjectIdsPipelined (Succ n'))) + (\collected -> checkState st & goCollect n' collected) + else do + traceWith tracer $ + TraceObjectDiffusionInboundCannotRequestMoreObjects (natToInt n) + -- In this case we can theoretically only collect replies or request + -- new object IDs. + -- + -- But it's important not to pipeline more requests for objectIds now + -- because if we did, then immediately after sending the request (but + -- having not yet received a response to either this or the other + -- pipelined requests), we would directly re-enter this code path, + -- resulting us in filling the pipeline with an unbounded number of + -- requests. + -- + -- So we instead block until we collect a reply. + pure $! + CollectPipelined + Nothing + (\collected -> checkState st & goCollect n' collected) + + goCollect :: + forall (n :: N). + Nat n -> + Collect objectId object -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goCollect n collect !st = case collect of + CollectObjectIds numIdsRequested collectedIds -> WithEffect $ do + let numCollectedIds = length collectedIds + collectedIdsMap = Map.fromListWith (+) [(x, 1 :: Int) | x <- collectedIds] + + -- Check they didn't send more than we asked for. We don't need to + -- check for a minimum: the blocking case checks for non-zero + -- elsewhere, and for the non-blocking case it is quite normal for + -- them to send us none. + when (numCollectedIds > fromIntegral numIdsRequested) $ + throwIO + ( ProtocolErrorObjectIdsNotRequested @objectId @object + (fromIntegral numIdsRequested) + numCollectedIds + ) + + -- Check that the server didn't send duplicate IDs in its response + when (Map.size collectedIdsMap /= numCollectedIds) $ + throwIO + ( ProtocolErrorObjectIdsDuplicate @objectId @object $ + Map.filter (> 1) $ + collectedIdsMap + ) + + -- Check that the server didn't send IDs that were already in the + -- outstanding FIFO + let alreadyKnownIds = Seq.filter (`Map.member` collectedIdsMap) (outstandingFifo st) + when (not (null alreadyKnownIds)) $ + throwIO + ( ProtocolErrorObjectIdsAlreadyKnown @objectId @object $ + Set.fromList (toList alreadyKnownIds) + ) + + -- We extend our outstanding FIFO with the newly received objectIds by + -- calling 'preAcknowledge' which will also pre-emptively acknowledge the + -- objectIds that we already have in the pool and thus don't need to + -- request. + let !st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested} + poolHasObject <- atomically $ opwHasObject + let !st'' = preAcknowledge st' poolHasObject collectedIds + pure $! checkState st'' & go n + CollectObjects requestedIds collectedObjects -> WithEffect $ do + let requestedIdsSet = Set.fromList requestedIds + obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects) + + -- To start with we have to verify that the objects they have sent us are + -- exactly the objects we asked for, not more, not less. + when (requestedIdsSet /= obtainedIdsSet) $ + let reqButNotRecvd = requestedIdsSet `Set.difference` obtainedIdsSet + recvButNotReq = obtainedIdsSet `Set.difference` requestedIdsSet + in throwIO + ( ProtocolErrorObjectsDifferentThanRequested @objectId @object + reqButNotRecvd + recvButNotReq + ) + + traceWith tracer $ + TraceObjectDiffusionInboundCollectedObjects (length collectedObjects) + + -- We update 'pendingObjects' with the newly obtained objects + let pendingObjects' = + foldl' + (\accMap object -> Map.insert (opwObjectId object) (Just object) accMap) + (pendingObjects st) + collectedObjects + + -- We then find the longest prefix of 'outstandingFifo' for which we have + -- all the corresponding IDs in 'pendingObjects'. + -- We remove this prefix from 'outstandingFifo'. + (objectIdsToAck, outstandingFifo') = + Seq.spanl (`Map.member` pendingObjects') (outstandingFifo st) + + -- And also remove these entries from 'pendingObjects'. + -- + -- Note that unlike in TX-Submission, we made sure the outstanding FIFO + -- couldn't have duplicate IDs, so we don't have to worry about re-adding + -- the duplicate IDs to 'pendingObjects' for future acknowledgment. + pendingObjects'' = + Foldable.foldl' + (flip Map.delete) + pendingObjects' + objectIdsToAck + + -- These are the objects we need to submit to the object pool + objectsToAck = + catMaybes $ + (((Map.!) pendingObjects') <$> toList objectIdsToAck) + + opwAddObjects objectsToAck + traceWith tracer $ + TraceObjectDiffusionInboundAddedObjects + (NumObjectsProcessed (fromIntegral $ length objectsToAck)) + + let !st' = + st + { pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } + pure $! checkState st' & go n + + goReqObjectIdsBlocking :: + InboundSt objectId object -> + InboundStIdle 'Z objectId object m () + goReqObjectIdsBlocking !st = + let numIdsToRequest = numIdsToReq st + -- We should only request new object IDs in a blocking way if we have + -- absolutely nothing else we can do. + !st' = + st + { numToAckOnNextReq = 0 + , numIdsInFlight = numIdsToRequest + } + in assert + ( numIdsInFlight st == 0 + && Seq.null (outstandingFifo st) + && Set.null (canRequestNext st) + && Map.null (pendingObjects st) + ) + $ SendMsgRequestObjectIdsBlocking + (numToAckOnNextReq st) + numIdsToRequest + ( \neCollectedIds -> + checkState st' & goCollect Zero (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) + ) + + goReqObjectsAndObjectIdsPipelined :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goReqObjectsAndObjectIdsPipelined n !st = + -- TODO: This implementation is deliberately naive, we pick in an + -- arbitrary order. We may want to revisit this later. + let (toRequest, canRequestNext') = + Set.splitAt (fromIntegral maxNumObjectsToReq) (canRequestNext st) + !st' = st{canRequestNext = canRequestNext'} + in SendMsgRequestObjectsPipelined + (toList toRequest) + (checkState st' & goReqObjectIdsPipelined (Succ n)) + + goReqObjectIdsPipelined :: + forall (n :: N). + Nat n -> + InboundSt objectId object -> + InboundStIdle n objectId object m () + goReqObjectIdsPipelined n !st = + let numIdsToRequest = numIdsToReq st + in if numIdsToRequest <= 0 + then checkState st & go n + else + let !st' = + st + { numIdsInFlight = + numIdsInFlight st + + numIdsToRequest + , numToAckOnNextReq = 0 + } + in SendMsgRequestObjectIdsPipelined + (numToAckOnNextReq st) + numIdsToRequest + (checkState st' & go (Succ n)) + + -- Ignore all outstanding replies to messages we pipelined ("drain"), and then + -- terminate. + terminateAfterDrain :: + Nat n -> InboundStIdle n objectId object m () + terminateAfterDrain = \case + Zero -> SendMsgDone () + Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n + +-- | Helper to ensure that the `InboundSt` is free of unexpected thunks and +-- stays strict during the whole process +checkState :: NoThunks s => s -> s +checkState !st = checkInvariant (noThunksInvariant st) st diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs new file mode 100644 index 0000000000..a41e0cf675 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs @@ -0,0 +1,67 @@ +-- | API for reading from and writing to object pools in the ObjectDiffusion +-- miniprotocol. +-- +-- The underlying object pool can be any database, such as a 'PerasCertDb' in +-- Peras certificate diffusion. +-- +-- 'ObjectPoolReader' is used on the outbound side of the protocol. Objects in +-- the pool are ordered by a strictly increasing ticket number ('ticketNo'), +-- which represents their time of arrival. Ticket numbers are local to each +-- node, unlike object IDs, which are global. Object IDs are not used for +-- ordering, since objects may arrive slightly out of order from peers. +-- +-- To read from the pool, one requests objects with a ticket number strictly +-- greater than the last known one. 'oprZeroTicketNo' provides an initial ticket +-- number for the first request. +-- +-- 'ObjectPoolWriter' is used on the inbound side of the protocol. It allows +-- checking whether an object is already present (to avoid re-requesting it) and +-- appending new objects. Ticket numbers are not part of the inbound interface, +-- but are used internally: newly added objects always receive a ticket number +-- strictly greater than those of older ones. +-- +-- This API design is inspired by 'MempoolSnapshot' from the TX-submission +-- miniprotocol, see: +-- +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + ( ObjectPoolReader (..) + , ObjectPoolWriter (..) + ) where + +import Control.Concurrent.Class.MonadSTM.Strict (STM) +import Data.Map (Map) +import Data.Word (Word64) + +-- | Interface used by the outbound side of object diffusion as its source of +-- objects to give to the remote side. +data ObjectPoolReader objectId object ticketNo m + = ObjectPoolReader + { oprObjectId :: object -> objectId + -- ^ Return the id of the specified object + , oprZeroTicketNo :: ticketNo + -- ^ Ticket number before the first item in the pool. + , oprObjectsAfter :: ticketNo -> Word64 -> STM m (Maybe (m (Map ticketNo object))) + -- ^ Get the map of objects available in the pool with a 'ticketNo' greater + -- than the specified one, with the number of returned objects capped to the + -- specified limit. + -- Returns 'Nothing' if there are no such objects available atm in the pool, + -- or 'Just' a monadic action yielding the map of objects otherwise. + -- The monadic action is there to account for the fact that objects might be + -- stored on disk. + -- If the monadic action is executed immediately, the returned map will be + -- non-empty. But if the consumer delays executing the action too much, + -- it is possible that the objects get garbage-collected in the meantime, and + -- thus the returned map could be empty. + } + +-- | Interface used by the inbound side of object diffusion when receiving +-- objects. +data ObjectPoolWriter objectId object m + = ObjectPoolWriter + { opwObjectId :: object -> objectId + -- ^ Return the id of the specified object + , opwAddObjects :: [object] -> m () + -- ^ Add a batch of objects to the objectPool. + , opwHasObject :: STM m (objectId -> Bool) + -- ^ Check if the object pool contains an object with the given id + } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs new file mode 100644 index 0000000000..26bff506f3 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs @@ -0,0 +1,114 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE StandaloneDeriving #-} + +-- | Instantiate 'ObjectPoolReader' and 'ObjectPoolWriter' using Peras +-- certificates from the 'PerasCertDB' (or the 'ChainDB' which is wrapping the +-- 'PerasCertDB'). +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert + ( makePerasCertPoolReaderFromCertDB + , makePerasCertPoolWriterFromCertDB + , makePerasCertPoolReaderFromChainDB + , makePerasCertPoolWriterFromChainDB + ) where + +import Data.Map (Map) +import qualified Data.Map as Map +import GHC.Exception (throw) +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB +import Ouroboros.Consensus.Storage.PerasCertDB.API + ( PerasCertDB + , PerasCertSnapshot + , PerasCertTicketNo + ) +import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB +import Ouroboros.Consensus.Util.IOLike + +-- | TODO: replace by `Data.Map.take` as soon as we move to GHC 9.8 +takeAscMap :: Int -> Map k v -> Map k v +takeAscMap n = Map.fromDistinctAscList . take n . Map.toAscList + +makePerasCertPoolReaderFromSnapshot :: + (IOLike m, StandardHash blk) => + STM m (PerasCertSnapshot blk) -> + ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromSnapshot getCertSnapshot = + ObjectPoolReader + { oprObjectId = getPerasCertRound + , oprZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo + , oprObjectsAfter = \lastKnown limit -> do + certSnapshot <- getCertSnapshot + let certsAfterLastKnown = + PerasCertDB.getCertsAfter certSnapshot lastKnown + let loadCertsAfterLastKnown = + pure (getPerasCert <$> takeAscMap (fromIntegral limit) certsAfterLastKnown) + pure $ + if Map.null certsAfterLastKnown + then Nothing + else Just loadCertsAfterLastKnown + } + +makePerasCertPoolReaderFromCertDB :: + (IOLike m, StandardHash blk) => + PerasCertDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromCertDB perasCertDB = + makePerasCertPoolReaderFromSnapshot (PerasCertDB.getCertSnapshot perasCertDB) + +makePerasCertPoolWriterFromCertDB :: + (StandardHash blk, IOLike m) => + PerasCertDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m +makePerasCertPoolWriterFromCertDB perasCertDB = + ObjectPoolWriter + { opwObjectId = getPerasCertRound + , opwAddObjects = \certs -> do + validatePerasCerts certs + >>= mapM_ (PerasCertDB.addCert perasCertDB) + , opwHasObject = do + certSnapshot <- PerasCertDB.getCertSnapshot perasCertDB + pure $ PerasCertDB.containsCert certSnapshot + } + +makePerasCertPoolReaderFromChainDB :: + (IOLike m, StandardHash blk) => + ChainDB m blk -> ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m +makePerasCertPoolReaderFromChainDB chainDB = + makePerasCertPoolReaderFromSnapshot (ChainDB.getPerasCertSnapshot chainDB) + +makePerasCertPoolWriterFromChainDB :: + (StandardHash blk, IOLike m) => + ChainDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m +makePerasCertPoolWriterFromChainDB chainDB = + ObjectPoolWriter + { opwObjectId = getPerasCertRound + , opwAddObjects = \certs -> do + validatePerasCerts certs + >>= mapM_ (ChainDB.addPerasCertAsync chainDB) + , opwHasObject = do + certSnapshot <- ChainDB.getPerasCertSnapshot chainDB + pure $ PerasCertDB.containsCert certSnapshot + } + +data PerasCertInboundException + = forall blk. PerasCertValidationError (PerasValidationErr blk) + +deriving instance Show PerasCertInboundException + +instance Exception PerasCertInboundException + +-- | Validate a list of 'PerasCert's, throwing a 'PerasCertInboundException' if +-- any of them are invalid. +validatePerasCerts :: + (StandardHash blk, MonadThrow m) => + [PerasCert blk] -> + m [ValidatedPerasCert blk] +validatePerasCerts certs = do + let perasCfg = makePerasCfg Nothing + -- TODO replace the mocked-up Nothing with a real + -- 'BlockConfig' when all the plumbing is in place + -- see https://github.com/tweag/cardano-peras/issues/73 + -- see https://github.com/tweag/cardano-peras/issues/120 + case traverse (validatePerasCert perasCfg) certs of + Left validationErr -> throw (PerasCertValidationError validationErr) + Right validatedCerts -> return validatedCerts diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs new file mode 100644 index 0000000000..52024ecdc8 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs @@ -0,0 +1,278 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | Defines the outbound side of the ObjectDiffusion mini-protocol. +-- This protocol is inspired from TxSubmission and aims to provide a generic +-- way to diffuse objects for Peras, i.e. Peras votes and certificates. +-- +-- See the design document here: https://tweag.github.io/cardano-peras/peras-design.pdf +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound + ( objectDiffusionOutbound + , TraceObjectDiffusionOutbound (..) + , ObjectDiffusionOutboundError (..) + ) where + +import Control.Exception (assert) +import Control.Monad (join, unless, when) +import Control.Monad.Class.MonadSTM +import Control.Monad.Class.MonadThrow +import Control.Tracer (Tracer, traceWith) +import Data.Foldable qualified as Foldable +import Data.List.NonEmpty qualified as NonEmpty +import Data.Map (Map) +import Data.Map qualified as Map +import Data.Maybe (fromMaybe) +import Data.Sequence.Strict (StrictSeq) +import Data.Sequence.Strict qualified as Seq +import Data.Set qualified as Set +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + +data TraceObjectDiffusionOutbound objectId object + = TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq + | -- | The IDs to be sent in the response + TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId] + | -- | The IDs of the objects requested. + TraceObjectDiffusionOutboundRecvMsgRequestObjects + [objectId] + | -- | The objects to be sent in the response. + TraceObjectDiffusionOutboundSendMsgReplyObjects + [object] + | -- | There is a mismatch between the result from the in-memory object pool + -- index and the content we can load so we have to retry the whole monad m + -- action to load new objects. + TraceObjectDiffusionOutboundUnableToLoadObjectsThusRetrying + | -- | Received 'MsgDone' + TraceObjectDiffusionOutboundTerminated + deriving Show + +data ObjectDiffusionOutboundError + = ProtocolErrorAckedTooManyObjectIds + | ProtocolErrorRequestedNothing + | ProtocolErrorRequestedTooManyObjectIds NumObjectIdsReq NumObjectsOutstanding + | ProtocolErrorRequestBlocking + | ProtocolErrorRequestNonBlocking + | ProtocolErrorRequestedUnavailableObject + | ProtocolErrorRequestedDuplicateObject + deriving Show + +instance Exception ObjectDiffusionOutboundError where + displayException ProtocolErrorAckedTooManyObjectIds = + "The peer tried to acknowledged more objectIds than are available to do so." + displayException (ProtocolErrorRequestedTooManyObjectIds reqNo maxUnacked) = + "The peer requested " + ++ show reqNo + ++ " objectIds which would put the " + ++ "total in flight over the limit of " + ++ show maxUnacked + displayException ProtocolErrorRequestedNothing = + "The peer requested zero objectIds." + displayException ProtocolErrorRequestBlocking = + "The peer made a blocking request for more objectIds when there are still " + ++ "unacknowledged objectIds. It should have used a non-blocking request." + displayException ProtocolErrorRequestNonBlocking = + "The peer made a non-blocking request for more objectIds when there are " + ++ "no unacknowledged objectIds. It should have used a blocking request." + displayException ProtocolErrorRequestedUnavailableObject = + "The peer requested an object which is not available, either " + ++ "because it was never available or because it was previously requested." + displayException ProtocolErrorRequestedDuplicateObject = + "The peer requested the same object twice." + +data OutboundSt objectId object ticketNo = OutboundSt + { outstandingFifo :: !(StrictSeq object) + , lastTicketNo :: !ticketNo + } + +objectDiffusionOutbound :: + forall objectId object ticketNo m. + (Ord objectId, Ord ticketNo, MonadSTM m, MonadThrow m) => + Tracer m (TraceObjectDiffusionOutbound objectId object) -> + -- | Maximum number of unacknowledged objectIds allowed + NumObjectsOutstanding -> + ObjectPoolReader objectId object ticketNo m -> + NodeToNodeVersion -> + ObjectDiffusionOutbound objectId object m () +objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version = + ObjectDiffusionOutbound (pure (makeBundle $ OutboundSt Seq.empty oprZeroTicketNo)) + where + makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m () + makeBundle !st = + OutboundStIdle + { recvMsgRequestObjectIds = recvMsgRequestObjectIds st + , recvMsgRequestObjects = recvMsgRequestObjects st + , recvMsgDone = traceWith tracer TraceObjectDiffusionOutboundTerminated + } + + updateStNewObjects :: + OutboundSt objectId object ticketNo -> + Map ticketNo object -> + OutboundSt objectId object ticketNo + updateStNewObjects !OutboundSt{..} newObjectsWithTicketNos = + -- These objects should all be fresh + assert (all (> lastTicketNo) (Map.keys newObjectsWithTicketNos)) $ + let !outstandingFifo' = + Foldable.foldl' + (Seq.|>) + outstandingFifo + newObjectsWithTicketNos + !lastTicketNo' = case Map.lookupMax newObjectsWithTicketNos of + Nothing -> lastTicketNo + Just (ticketNo, _) -> ticketNo + in OutboundSt + { outstandingFifo = outstandingFifo' + , lastTicketNo = lastTicketNo' + } + + recvMsgRequestObjectIds :: + forall blocking. + OutboundSt objectId object ticketNo -> + SingBlockingStyle blocking -> + NumObjectIdsAck -> + NumObjectIdsReq -> + m (OutboundStObjectIds blocking objectId object m ()) + recvMsgRequestObjectIds !st@OutboundSt{..} blocking numIdsToAck numIdsToReq = do + traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjectIds numIdsToReq) + + when (numIdsToAck > fromIntegral (Seq.length outstandingFifo)) $ + throwIO ProtocolErrorAckedTooManyObjectIds + + when + ( Seq.length outstandingFifo + - fromIntegral numIdsToAck + + fromIntegral numIdsToReq + > fromIntegral maxFifoLength + ) + $ throwIO (ProtocolErrorRequestedTooManyObjectIds numIdsToReq maxFifoLength) + + -- First we update our FIFO to remove the number of objectIds that the + -- inbound peer has acknowledged. + let !outstandingFifo' = Seq.drop (fromIntegral numIdsToAck) outstandingFifo + -- must specify the type here otherwise GHC complains about mismatch objectId types + st' :: OutboundSt objectId object ticketNo + !st' = st{outstandingFifo = outstandingFifo'} + + -- Grab info about any new objects after the last object ticketNo we've + -- seen, up to the number that the peer has requested. + case blocking of + ----------------------------------------------------------------------- + SingBlocking -> do + when (numIdsToReq == 0) $ + throwIO ProtocolErrorRequestedNothing + unless (Seq.null outstandingFifo') $ + throwIO ProtocolErrorRequestBlocking + + -- oprObjectIdsAfter returns early with Nothing if there are no new + -- objects after lastTicketNo. So we retry efficiently using STM until + -- we get 'Just'. + -- But even if we get 'Just', when loading the actual objects (as a + -- non-STM action, taking place in monad m), we might find that the map + -- is empty (because objects were deleted from the pool in the meantime). + -- So we have a outer-level retry loop, this time in monad m. + -- + -- The only case were the outer-level retry loop would spin forever is + -- if the in-memory index of the object pool keeps indicating that + -- there are new objects after 'lastTicketNo', but when we try to load + -- them, they are not there anymore. This would indicate a bug in the + -- object pool implementation. + newObjects <- retryMaybeWith (traceWith tracer TraceObjectDiffusionOutboundUnableToLoadObjectsThusRetrying) + . join + . atomically + $ do + mLoadNewObjects <- oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq) + maybe retry (pure . fmap ensureNonEmpty) mLoadNewObjects + + let !newIds = oprObjectId <$> Map.elems newObjects + st'' = updateStNewObjects st' newObjects + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) + + pure $ + SendMsgReplyObjectIds + (BlockingReply (NonEmpty.fromList $ newIds)) + (makeBundle st'') + + ----------------------------------------------------------------------- + SingNonBlocking -> do + when (numIdsToReq == 0 && numIdsToAck == 0) $ + throwIO ProtocolErrorRequestedNothing + when (Seq.null outstandingFifo') $ + throwIO ProtocolErrorRequestNonBlocking + + -- oprObjectIdsAfter returns early with Nothing if there are no new + -- objects after lastTicketNo. So we use fromMaybe to provide an empty map + -- in that case. + newObjects <- + join . atomically . fmap (fromMaybe $ pure Map.empty) $ + oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq) + + let !newIds = oprObjectId <$> Map.elems newObjects + st'' = updateStNewObjects st' newObjects + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds) + + pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st'')) + + recvMsgRequestObjects :: + OutboundSt objectId object ticketNo -> + [objectId] -> + m (OutboundStObjects objectId object m ()) + recvMsgRequestObjects !st@OutboundSt{..} requestedIds = do + traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjects requestedIds) + + -- All the objects correspond to advertised objectIds are already in the + -- outstandingFifo. So we don't need to read from the object pool here. + + -- I've optimized the search to do only one traversal of 'outstandingFifo'. + -- When the 'requestedIds' is exactly the whole 'outstandingFifo', then this + -- should take O(n * log n) time. + -- + -- TODO: We might need to revisit the underlying 'outstandingFifo' data + -- structure and the search if performance isn't sufficient when we'll use + -- ObjectDiffusion for votes diffusion (and not just cert diffusion). + + let requestedIdsSet = Set.fromList requestedIds + + when (Set.size requestedIdsSet /= length requestedIds) $ + throwIO ProtocolErrorRequestedDuplicateObject + + let requestedObjects = + foldr + ( \obj acc -> + if Set.member (oprObjectId obj) requestedIdsSet + then obj : acc + else acc + ) + [] + outstandingFifo + + when (Set.size requestedIdsSet /= length requestedObjects) $ + throwIO ProtocolErrorRequestedUnavailableObject + + traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjects requestedObjects) + + pure (SendMsgReplyObjects requestedObjects (makeBundle st)) + +-- | Retry an action that returns 'Maybe a' until it returns 'Just a'. +-- The 'onRetry' action is executed before each retry. +retryMaybeWith :: Monad m => m () -> m (Maybe a) -> m a +retryMaybeWith onRetry action = loop + where + loop = do + mx <- action + case mx of + Just x -> pure x + Nothing -> onRetry >> loop + +-- | Convert an empty map to 'Nothing', and a non-empty map to 'Just' that map. +-- +-- TODO: we could alternatively replace this with 'Data.NonEmpty.Map.nonEmptyMap' +ensureNonEmpty :: Map k v -> Maybe (Map k v) +ensureNonEmpty m + | Map.null m = Nothing + | otherwise = Just m diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs new file mode 100644 index 0000000000..ba0ba934a2 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/PerasCert.hs @@ -0,0 +1,41 @@ +-- | This module defines type aliases for the ObjectDiffusion protocol applied +-- to PerasCert diffusion. +module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert + ( TracePerasCertDiffusionInbound + , TracePerasCertDiffusionOutbound + , PerasCertPoolReader + , PerasCertPoolWriter + , PerasCertDiffusionInboundPipelined + , PerasCertDiffusionOutbound + , PerasCertDiffusion + ) where + +import Ouroboros.Consensus.Block +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound +import Ouroboros.Consensus.Storage.PerasCertDB.API +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound (ObjectDiffusionInboundPipelined) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (ObjectDiffusionOutbound) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type (ObjectDiffusion) + +type TracePerasCertDiffusionInbound blk = + TraceObjectDiffusionInbound PerasRoundNo (PerasCert blk) + +type TracePerasCertDiffusionOutbound blk = + TraceObjectDiffusionOutbound PerasRoundNo (PerasCert blk) + +type PerasCertPoolReader blk m = + ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m + +type PerasCertPoolWriter blk m = + ObjectPoolWriter PerasRoundNo (PerasCert blk) m + +type PerasCertDiffusionInboundPipelined blk m a = + ObjectDiffusionInboundPipelined PerasRoundNo (PerasCert blk) m a + +type PerasCertDiffusionOutbound blk m a = + ObjectDiffusionOutbound PerasRoundNo (PerasCert blk) m a + +type PerasCertDiffusion blk = + ObjectDiffusion PerasRoundNo (PerasCert blk) diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs index 6520aae47c..6a4fc87229 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Node/Serialisation.hs @@ -6,8 +6,11 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE StandaloneKindSignatures #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE UndecidableInstances #-} -- | Serialisation for sending things across the network. @@ -33,8 +36,8 @@ module Ouroboros.Consensus.Node.Serialisation , Some (..) ) where -import Codec.CBOR.Decoding (Decoder) -import Codec.CBOR.Encoding (Encoding) +import Codec.CBOR.Decoding (Decoder, decodeListLenOf) +import Codec.CBOR.Encoding (Encoding, encodeListLen) import Codec.Serialise (Serialise (decode, encode)) import Data.Kind import Data.SOP.BasicFunctors @@ -47,7 +50,15 @@ import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.TypeFamilyWrappers import Ouroboros.Consensus.Util (Some (..)) -import Ouroboros.Network.Block (unwrapCBORinCBOR, wrapCBORinCBOR) +import Ouroboros.Network.Block + ( Tip + , decodePoint + , decodeTip + , encodePoint + , encodeTip + , unwrapCBORinCBOR + , wrapCBORinCBOR + ) {------------------------------------------------------------------------------- NodeToNode @@ -173,6 +184,29 @@ deriving newtype instance SerialiseNodeToNode blk (GenTxId blk) => SerialiseNodeToNode blk (WrapGenTxId blk) +instance ConvertRawHash blk => SerialiseNodeToNode blk (Point blk) where + encodeNodeToNode _ccfg _version = encodePoint $ encodeRawHash (Proxy @blk) + decodeNodeToNode _ccfg _version = decodePoint $ decodeRawHash (Proxy @blk) + +instance ConvertRawHash blk => SerialiseNodeToNode blk (Tip blk) where + encodeNodeToNode _ccfg _version = encodeTip $ encodeRawHash (Proxy @blk) + decodeNodeToNode _ccfg _version = decodeTip $ decodeRawHash (Proxy @blk) + +instance SerialiseNodeToNode blk PerasRoundNo where + encodeNodeToNode _ccfg _version = encode + decodeNodeToNode _ccfg _version = decode +instance ConvertRawHash blk => SerialiseNodeToNode blk (PerasCert blk) where + -- Consistent with the 'Serialise' instance for 'PerasCert' defined in Ouroboros.Consensus.Block.SupportsPeras + encodeNodeToNode ccfg version PerasCert{..} = + encodeListLen 2 + <> encodeNodeToNode ccfg version pcCertRound + <> encodeNodeToNode ccfg version pcCertBoostedBlock + decodeNodeToNode ccfg version = do + decodeListLenOf 2 + pcCertRound <- decodeNodeToNode ccfg version + pcCertBoostedBlock <- decodeNodeToNode ccfg version + pure $ PerasCert pcCertRound pcCertBoostedBlock + deriving newtype instance SerialiseNodeToClient blk (GenTxId blk) => SerialiseNodeToClient blk (WrapGenTxId blk) diff --git a/ouroboros-consensus/test/consensus-test/Main.hs b/ouroboros-consensus/test/consensus-test/Main.hs index beddd1f7d2..79d681213a 100644 --- a/ouroboros-consensus/test/consensus-test/Main.hs +++ b/ouroboros-consensus/test/consensus-test/Main.hs @@ -16,6 +16,8 @@ import qualified Test.Consensus.MiniProtocol.BlockFetch.Client (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.CSJ (tests) import qualified Test.Consensus.MiniProtocol.ChainSync.Client (tests) import qualified Test.Consensus.MiniProtocol.LocalStateQuery.Server (tests) +import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) +import qualified Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke (tests) import qualified Test.Consensus.Peras.WeightSnapshot (tests) import qualified Test.Consensus.Util.MonadSTM.NormalForm (tests) import qualified Test.Consensus.Util.Versioned (tests) @@ -37,6 +39,8 @@ tests = , Test.Consensus.MiniProtocol.BlockFetch.Client.tests , Test.Consensus.MiniProtocol.ChainSync.CSJ.tests , Test.Consensus.MiniProtocol.ChainSync.Client.tests + , Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke.tests + , Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke.tests , Test.Consensus.MiniProtocol.LocalStateQuery.Server.tests , testGroup "Mempool" diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs new file mode 100644 index 0000000000..0320962c92 --- /dev/null +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs @@ -0,0 +1,145 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE UndecidableInstances #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Test.Consensus.MiniProtocol.ObjectDiffusion.PerasCert.Smoke (tests) where + +import Control.Tracer (contramap, nullTracer) +import Data.Functor.Identity (Identity (..)) +import qualified Data.List.NonEmpty as NE +import qualified Data.Map as Map +import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) +import Ouroboros.Consensus.Block.SupportsPeras +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert +import Ouroboros.Consensus.Storage.PerasCertDB.API + ( AddPerasCertResult (..) + , PerasCertDB + , PerasCertTicketNo + ) +import qualified Ouroboros.Consensus.Storage.PerasCertDB.API as PerasCertDB +import qualified Ouroboros.Consensus.Storage.PerasCertDB.Impl as PerasCertDB +import Ouroboros.Consensus.Util.IOLike +import Ouroboros.Network.Block (Point (..), SlotNo (SlotNo), StandardHash) +import Ouroboros.Network.Point (Block (Block), WithOrigin (..)) +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound (objectDiffusionOutboundPeer) +import Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke + ( ListWithUniqueIds (..) + , WithId + , genListWithUniqueIds + , genProtocolConstants + , getId + , prop_smoke_object_diffusion + ) +import Test.QuickCheck +import Test.Tasty +import Test.Tasty.QuickCheck (testProperty) +import Test.Util.TestBlock + +tests :: TestTree +tests = + testGroup + "ObjectDiffusion.PerasCert.Smoke" + [ testProperty + "PerasCertDiffusion smoke test" + prop_smoke + ] + +genPoint :: Gen (Point (TestBlock)) +genPoint = + -- Sometimes pick the genesis point + frequency + [ (1, pure $ Point Origin) + , + ( 50 + , do + slotNo <- SlotNo <$> arbitrary + hash <- TestHash . NE.fromList . getNonEmpty <$> arbitrary + pure $ Point (At (Block slotNo hash)) + ) + ] + +genPerasCert :: Gen (PerasCert TestBlock) +genPerasCert = do + pcCertRound <- PerasRoundNo <$> arbitrary + pcCertBoostedBlock <- genPoint + pure $ PerasCert{pcCertRound, pcCertBoostedBlock} + +instance WithId (PerasCert blk) PerasRoundNo where + getId = pcCertRound + +newCertDB :: (IOLike m, StandardHash blk) => [PerasCert blk] -> m (PerasCertDB m blk) +newCertDB certs = do + db <- PerasCertDB.openDB (PerasCertDB.PerasCertDbArgs @Identity nullTracer) + mapM_ + ( \cert -> do + let validatedCert = + ValidatedPerasCert + { vpcCert = cert + , vpcCertBoost = boostPerCert + } + result <- PerasCertDB.addCert db validatedCert + case result of + AddedPerasCertToDB -> pure () + PerasCertAlreadyInDB -> throwIO (userError "Expected AddedPerasCertToDB, but cert was already in DB") + ) + certs + pure db + +prop_smoke :: Property +prop_smoke = + forAll genProtocolConstants $ \protocolConstants -> + forAll (genListWithUniqueIds genPerasCert) $ \(ListWithUniqueIds certs) -> + let + runOutboundPeer outbound outboundChannel tracer = + runPeer + ((\x -> "Outbound (Client): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + outboundChannel + (objectDiffusionOutboundPeer outbound) + >> pure () + runInboundPeer inbound inboundChannel tracer = + runPipelinedPeer + ((\x -> "Inbound (Server): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + inboundChannel + (objectDiffusionInboundPeerPipelined inbound) + >> pure () + mkPoolInterfaces :: + forall m. + IOLike m => + m + ( ObjectPoolReader PerasRoundNo (PerasCert TestBlock) PerasCertTicketNo m + , ObjectPoolWriter PerasRoundNo (PerasCert TestBlock) m + , m [PerasCert TestBlock] + ) + mkPoolInterfaces = do + outboundPool <- newCertDB certs + inboundPool <- newCertDB [] + + let outboundPoolReader = makePerasCertPoolReaderFromCertDB outboundPool + inboundPoolWriter = makePerasCertPoolWriterFromCertDB inboundPool + getAllInboundPoolContent = atomically $ do + snap <- PerasCertDB.getCertSnapshot inboundPool + let rawContent = + Map.toAscList $ + PerasCertDB.getCertsAfter snap (PerasCertDB.zeroPerasCertTicketNo) + pure $ getPerasCert . snd <$> rawContent + + return (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) + in + prop_smoke_object_diffusion + protocolConstants + certs + runOutboundPeer + runInboundPeer + mkPoolInterfaces diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs new file mode 100644 index 0000000000..bcd952fb31 --- /dev/null +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/Smoke.hs @@ -0,0 +1,326 @@ +{-# LANGUAGE DerivingVia #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | Smoke tests for the object diffusion protocol. This uses a trivial object +-- pool and checks that a few objects can indeed be transferred from the +-- outbound to the inbound peer. +module Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke + ( tests + , WithId (..) + , ListWithUniqueIds (..) + , genListWithUniqueIds + , ProtocolConstants + , genProtocolConstants + , prop_smoke_object_diffusion + ) where + +import Control.Monad (void) +import Control.Monad.IOSim (runSimStrictShutdown) +import Control.ResourceRegistry (forkLinkedThread, withRegistry) +import Control.Tracer (Tracer, nullTracer, traceWith) +import Data.Containers.ListUtils (nubOrdOn) +import Data.Data (Typeable) +import Data.Functor.Contravariant (contramap) +import qualified Data.Map as Map +import Network.TypedProtocol.Channel (Channel, createConnectedChannels) +import Network.TypedProtocol.Codec (AnyMessage) +import Network.TypedProtocol.Driver.Simple (runPeer, runPipelinedPeer) +import NoThunks.Class (NoThunks) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound + ( objectDiffusionInbound + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API + ( ObjectPoolReader (..) + , ObjectPoolWriter (..) + ) +import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound) +import Ouroboros.Consensus.Util.IOLike + ( IOLike + , MonadDelay (..) + , MonadSTM (..) + , StrictTVar + , modifyTVar + , readTVar + , uncheckedNewTVarM + , writeTVar + ) +import Ouroboros.Network.ControlMessage (ControlMessage (..)) +import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..)) +import Ouroboros.Network.Protocol.ObjectDiffusion.Codec (codecObjectDiffusionId) +import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound + ( ObjectDiffusionInboundPipelined + , objectDiffusionInboundPeerPipelined + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound + ( ObjectDiffusionOutbound + , objectDiffusionOutboundPeer + ) +import Ouroboros.Network.Protocol.ObjectDiffusion.Type + ( NumObjectIdsReq (..) + , NumObjectsOutstanding (..) + , NumObjectsReq (..) + , ObjectDiffusion + ) +import Test.QuickCheck +import Test.Tasty +import Test.Tasty.QuickCheck +import Test.Util.Orphans.IOLike () + +tests :: TestTree +tests = + testGroup + "ObjectDiffusion.Smoke" + [ testProperty + "ObjectDiffusion smoke test with mock objects" + prop_smoke + ] + +class WithId a idTy | a -> idTy where + getId :: a -> idTy + +newtype ListWithUniqueIds a idTy = ListWithUniqueIds [a] + deriving (Eq, Show, Ord) + +genListWithUniqueIds :: (Ord idTy, WithId a idTy) => Gen a -> Gen (ListWithUniqueIds a idTy) +genListWithUniqueIds genA = ListWithUniqueIds . nubOrdOn getId <$> listOf genA + +instance WithId SmokeObject SmokeObjectId where getId = getSmokeObjectId + +{------------------------------------------------------------------------------- + Mock objectPools +-------------------------------------------------------------------------------} + +newtype SmokeObjectId = SmokeObjectId Int + deriving (Eq, Ord, Show, NoThunks) + +newtype SmokeObject = SmokeObject {getSmokeObjectId :: SmokeObjectId} + deriving (Eq, Ord, Show, NoThunks) + +genSmokeObject :: Gen SmokeObject +genSmokeObject = SmokeObject . SmokeObjectId <$> arbitrary + +newtype SmokeObjectPool m = SmokeObjectPool (StrictTVar m [SmokeObject]) + +newObjectPool :: MonadSTM m => [SmokeObject] -> m (SmokeObjectPool m) +newObjectPool initialPoolContent = SmokeObjectPool <$> uncheckedNewTVarM initialPoolContent + +makeObjectPoolReader :: + MonadSTM m => SmokeObjectPool m -> ObjectPoolReader SmokeObjectId SmokeObject Int m +makeObjectPoolReader (SmokeObjectPool poolContentTvar) = + ObjectPoolReader + { oprObjectId = getSmokeObjectId + , oprObjectsAfter = \minTicketNo limit -> do + poolContent <- readTVar poolContentTvar + let newContent = drop minTicketNo $ zip [(1 :: Int) ..] poolContent + if null newContent + then pure Nothing + else + pure . Just . pure $ + Map.fromList (take (fromIntegral limit) newContent) + , oprZeroTicketNo = 0 + } + +makeObjectPoolWriter :: + MonadSTM m => SmokeObjectPool m -> ObjectPoolWriter SmokeObjectId SmokeObject m +makeObjectPoolWriter (SmokeObjectPool poolContentTvar) = + ObjectPoolWriter + { opwObjectId = getSmokeObjectId + , opwAddObjects = \objects -> do + atomically $ modifyTVar poolContentTvar (++ objects) + return () + , opwHasObject = do + poolContent <- readTVar poolContentTvar + pure $ \objectId -> any (\obj -> getSmokeObjectId obj == objectId) poolContent + } + +mkMockPoolInterfaces :: + MonadSTM m => + [SmokeObject] -> + m + ( ObjectPoolReader SmokeObjectId SmokeObject Int m + , ObjectPoolWriter SmokeObjectId SmokeObject m + , m [SmokeObject] + ) +mkMockPoolInterfaces objects = do + outboundPool <- newObjectPool objects + inboundPool@(SmokeObjectPool tvar) <- newObjectPool [] + + let outboundPoolReader = makeObjectPoolReader outboundPool + inboundPoolWriter = makeObjectPoolWriter inboundPool + + return (outboundPoolReader, inboundPoolWriter, atomically $ readTVar tvar) + +{------------------------------------------------------------------------------- + Main properties +-------------------------------------------------------------------------------} + +-- Protocol constants + +newtype ProtocolConstants + = ProtocolConstants (NumObjectsOutstanding, NumObjectIdsReq, NumObjectsReq) + deriving Show + +genProtocolConstants :: Gen ProtocolConstants +genProtocolConstants = do + maxFifoSize <- choose (5, 20) + maxIdsToReq <- choose (3, maxFifoSize) + maxObjectsToReq <- choose (2, maxIdsToReq) + pure $ + ProtocolConstants + ( NumObjectsOutstanding maxFifoSize + , NumObjectIdsReq maxIdsToReq + , NumObjectsReq maxObjectsToReq + ) + +nodeToNodeVersion :: NodeToNodeVersion +nodeToNodeVersion = NodeToNodeV_14 + +prop_smoke :: Property +prop_smoke = + forAll genProtocolConstants $ \protocolConstants -> + forAll (genListWithUniqueIds genSmokeObject) $ \(ListWithUniqueIds objects) -> + let + runOutboundPeer outbound outboundChannel tracer = + runPeer + ((\x -> "Outbound (Server): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + outboundChannel + (objectDiffusionOutboundPeer outbound) + >> pure () + + runInboundPeer inbound inboundChannel tracer = + runPipelinedPeer + ((\x -> "Inbound (Client): " ++ show x) `contramap` tracer) + codecObjectDiffusionId + inboundChannel + (objectDiffusionInboundPeerPipelined inbound) + >> pure () + in + prop_smoke_object_diffusion + protocolConstants + objects + runOutboundPeer + runInboundPeer + (mkMockPoolInterfaces objects) + +--- The core logic of the smoke test is shared between the generic smoke tests for ObjectDiffusion, and the ones specialised to PerasCert/PerasVote diffusion +prop_smoke_object_diffusion :: + ( Eq object + , Show object + , Ord objectId + , Typeable objectId + , Typeable object + , NoThunks objectId + , Show objectId + , NoThunks object + , Ord ticketNo + ) => + ProtocolConstants -> + [object] -> + ( forall m. + IOLike m => + ObjectDiffusionOutbound objectId object m () -> + Channel m (AnyMessage (ObjectDiffusion objectId object)) -> + (Tracer m String) -> + m () + ) -> + ( forall m. + IOLike m => + ObjectDiffusionInboundPipelined objectId object m () -> + (Channel m (AnyMessage (ObjectDiffusion objectId object))) -> + (Tracer m String) -> + m () + ) -> + ( forall m. + IOLike m => + m + ( ObjectPoolReader objectId object ticketNo m + , ObjectPoolWriter objectId object m + , m [object] + ) + ) -> + Property +prop_smoke_object_diffusion + (ProtocolConstants (maxFifoSize, maxIdsToReq, maxObjectsToReq)) + objects + runOutboundPeer + runInboundPeer + mkPoolInterfaces = + let + simulationResult = runSimStrictShutdown $ do + let tracer = nullTracer + + traceWith tracer "========== [ Starting ObjectDiffusion smoke test ] ==========" + traceWith tracer "objects: " + traceWith tracer (show objects) + + (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) <- mkPoolInterfaces + + -- We wait until the inbound pool content stabilizes + -- Caveat: in the case where objects are continuously added to the + -- outbound pool, this may never terminate. + let waitUntilSettlement prevValue = do + -- TODO: should have a delay value equal to 4·Δ + Ɛ + -- were Δ is the delay in which any message is delivered on the + -- network and Ɛ is a small margin to encompass computation time; + -- as in the worst case, we need 4 echanged messages + -- (+ computation time, assumed negligible w.r.t. network delays) + -- to see a state update on the inbound side + threadDelay 10_000 + newValue <- getAllInboundPoolContent + if newValue == prevValue + then pure () + else waitUntilSettlement newValue + + controlMessage <- uncheckedNewTVarM Continue + + let + inbound = + objectDiffusionInbound + tracer + ( maxFifoSize + , maxIdsToReq + , maxObjectsToReq + ) + inboundPoolWriter + nodeToNodeVersion + (readTVar controlMessage) + + outbound = + objectDiffusionOutbound + tracer + maxFifoSize + outboundPoolReader + nodeToNodeVersion + + withRegistry $ \reg -> do + (outboundChannel, inboundChannel) <- createConnectedChannels + _outboundThread <- + forkLinkedThread reg "ObjectDiffusion Outbound peer thread" $ + runOutboundPeer outbound outboundChannel tracer + _inboundThread <- + forkLinkedThread reg "ObjectDiffusion Inbound peer thread" $ + runInboundPeer inbound inboundChannel tracer + + void $ waitUntilSettlement [] + atomically $ writeTVar controlMessage Terminate + + traceWith tracer "========== [ ObjectDiffusion smoke test finished ] ==========" + + poolContent <- getAllInboundPoolContent + traceWith tracer "inboundPoolContent: " + traceWith tracer (show poolContent) + traceWith tracer "========== ======================================= ==========" + pure poolContent + in + case simulationResult of + Right inboundPoolContent -> inboundPoolContent === objects + Left msg -> counterexample (show msg) $ property False diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs index 5df29e3d27..41e6c5b92b 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/PerasCertDB/StateMachine.hs @@ -113,7 +113,18 @@ instance StateModel Model where precondition (Model model) = \case OpenDB -> not model.open - _ -> model.open + action -> + model.open && case action of + CloseDB -> True + -- Equivocating certificates are not possible by construction, + -- so we should ensure that adding a certificate with the same round + -- number but different content is rejected. + -- See https://tweag.github.io/cardano-peras/peras-design.pdf#subsection.2.2.2 + AddCert cert -> all p model.certs + where + p cert' = getPerasCertRound cert /= getPerasCertRound cert' || cert == cert' + GetWeightSnapshot -> True + GarbageCollect _slot -> True deriving stock instance Show (Action Model a) deriving stock instance Eq (Action Model a)