55{-# LANGUAGE GADTs #-}
66{-# LANGUAGE GeneralizedNewtypeDeriving #-}
77{-# LANGUAGE KindSignatures #-}
8+ {-# LANGUAGE NumericUnderscores #-}
89{-# LANGUAGE RankNTypes #-}
910{-# LANGUAGE ScopedTypeVariables #-}
1011
@@ -21,8 +22,9 @@ module Test.Consensus.MiniProtocol.ObjectDiffusion.Smoke
2122 , prop_smoke_object_diffusion
2223 ) where
2324
25+ import Control.Monad (void )
2426import Control.Monad.IOSim (runSimStrictShutdown )
25- import Control.ResourceRegistry (forkLinkedThread , waitAnyThread , withRegistry )
27+ import Control.ResourceRegistry (forkLinkedThread , withRegistry )
2628import Control.Tracer (Tracer , nullTracer , traceWith )
2729import Data.Containers.ListUtils (nubOrdOn )
2830import Data.Data (Typeable )
@@ -257,9 +259,27 @@ prop_smoke_object_diffusion
257259 let tracer = nullTracer
258260
259261 traceWith tracer " ========== [ Starting ObjectDiffusion smoke test ] =========="
262+ traceWith tracer " objects: "
260263 traceWith tracer (show objects)
261264
262265 (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent) <- mkPoolInterfaces
266+
267+ -- We wait until the inbound pool content stabilizes
268+ -- Caveat: in the case where objects are continuously added to the
269+ -- outbound pool, this may never terminate.
270+ let waitUntilSettlement prevValue = do
271+ -- TODO: should have a delay value equal to 4·Δ + Ɛ
272+ -- were Δ is the delay in which any message is delivered on the
273+ -- network and Ɛ is a small margin to encompass computation time;
274+ -- as in the worst case, we need 4 echanged messages
275+ -- (+ computation time, assumed negligible w.r.t. network delays)
276+ -- to see a state update on the inbound side
277+ threadDelay 10_000
278+ newValue <- getAllInboundPoolContent
279+ if newValue == prevValue
280+ then pure ()
281+ else waitUntilSettlement newValue
282+
263283 controlMessage <- uncheckedNewTVarM Continue
264284
265285 let
@@ -283,22 +303,20 @@ prop_smoke_object_diffusion
283303
284304 withRegistry $ \ reg -> do
285305 (outboundChannel, inboundChannel) <- createConnectedChannels
286- outboundThread <-
306+ _outboundThread <-
287307 forkLinkedThread reg " ObjectDiffusion Outbound peer thread" $
288308 runOutboundPeer outbound outboundChannel tracer
289- inboundThread <-
309+ _inboundThread <-
290310 forkLinkedThread reg " ObjectDiffusion Inbound peer thread" $
291311 runInboundPeer inbound inboundChannel tracer
292- controlMessageThread <- forkLinkedThread reg " ObjectDiffusion Control thread" $ do
293- threadDelay 1000 -- give a head start to the other threads
294- atomically $ writeTVar controlMessage Terminate
295- threadDelay 1000 -- wait for the other threads to finish
296- waitAnyThread [outboundThread, inboundThread, controlMessageThread]
312+
313+ void $ waitUntilSettlement []
314+ atomically $ writeTVar controlMessage Terminate
297315
298316 traceWith tracer " ========== [ ObjectDiffusion smoke test finished ] =========="
299- poolContent <- getAllInboundPoolContent
300317
301- traceWith tracer " inboundPoolContent:"
318+ poolContent <- getAllInboundPoolContent
319+ traceWith tracer " inboundPoolContent: "
302320 traceWith tracer (show poolContent)
303321 traceWith tracer " ========== ======================================= =========="
304322 pure poolContent
0 commit comments