@@ -6,13 +6,16 @@ module Ouroboros.Byron.Proxy.Index.ChainDB
66 ) where
77
88import Control.Exception (bracket )
9+ import Data.Word (Word64 )
910
11+ import Ouroboros.Byron.Proxy.Block (checkpointOffsets )
1012import Ouroboros.Byron.Proxy.Index.Types (Index )
1113import qualified Ouroboros.Byron.Proxy.Index.Types as Index
1214import Ouroboros.Consensus.Block (GetHeader (Header ))
1315import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry )
16+ import Ouroboros.Consensus.Protocol.Abstract (SecurityParam )
1417import Ouroboros.Network.Block (ChainUpdate (.. ), Point (.. ))
15- import Ouroboros.Network.Point (WithOrigin (Origin ))
18+ import Ouroboros.Network.Point (WithOrigin (Origin ), block )
1619import Ouroboros.Storage.ChainDB.API (ChainDB , Reader )
1720import qualified Ouroboros.Storage.ChainDB.API as ChainDB
1821
@@ -21,7 +24,7 @@ trackReaderBlocking
2124 :: ( Monad m )
2225 => Index m (Header blk )
2326 -> Reader m blk (Header blk )
24- -> m x
27+ -> m void
2528trackReaderBlocking idx reader = do
2629 instruction <- ChainDB. readerInstructionBlocking reader
2730 case instruction of
@@ -50,42 +53,69 @@ trackReader idx reader = do
5053 trackReader idx reader
5154 Nothing -> pure ()
5255
53- -- | Have an Index track a ChainDB using its Reader API. You probably want to
54- -- race this with some other thread that runs your application.
55- --
56- -- If the ChainDB does not contain the tip of the Index, then the whole index
57- -- will be rebuilt.
56+ -- | Have an Index track a ChainDB using its Reader API for the duration of
57+ -- some monadic action.
5858--
5959-- It will spawn a thread to do the index updates. This must be the only
6060-- index writer. It is run by `race` with the action, so exceptions in either
6161-- the action or the writer thread will be re-thrown here.
6262--
63- -- If the tip of the index is not in the ChainDB, then the entire index will be
64- -- rebuilt. This is not ideal: there may be an intersection. TODO would be
65- -- better to check the newest slot older than `k` back from tip of index, and
66- -- go from there.
63+ -- If the tip of the index is in the ChainDB, then no work must be done in the
64+ -- beginning. But if it's not in the ChainDB, there will have to be a rollback
65+ -- on the index. The SecurityParam k is used to decide how far back to try. If
66+ -- Only index entries at most k slots old will be checked against the
67+ -- ChainDB. If none are in it, then the entire index will be rebuild (rollback
68+ -- to Origin).
6769trackChainDB
68- :: forall blk void .
70+ :: forall blk t .
6971 ResourceRegistry IO
7072 -> Index IO (Header blk )
7173 -> ChainDB IO blk
72- -> IO void
73- trackChainDB rr idx cdb = bracket acquireReader releaseReader $ \ rdr -> do
74- tipPoint <- Index. tip idx
75- mPoint <- ChainDB. readerForward rdr [Point tipPoint]
76- -- `readerForward` docs say that if we get `Nothing`, the next reader
77- -- instruction may not be a rollback, so we'll manually roll the index
78- -- back. It's assumed the read pointer will be at origin (nothing else
79- -- would make sense).
80- case mPoint of
81- Nothing -> Index. rollbackward idx Origin
82- Just _ -> pure ()
83- -- First, block until the index is caught up to the tip ...
84- trackReader idx rdr
85- -- ... then attempt to stay in sync.
86- trackReaderBlocking idx rdr
74+ -> SecurityParam
75+ -> IO t
76+ -> IO t
77+ trackChainDB rr idx cdb k act = bracket acquireReader releaseReader $ \ rdr -> do
78+ checkpoints <- Index. streamFromTip idx checkpointsFold
79+ mPoint <- ChainDB. readerForward rdr checkpoints
80+ case mPoint of
81+ -- `readerForward` docs say that the next instruction will be a rollback,
82+ -- so we don't have to do anything here; the call to `trackReader` will
83+ -- do what needs to be done.
84+ Just _ -> pure ()
85+ -- `readerForward` docs say that if we get `Nothing`, the next reader
86+ -- instruction may not be a rollback, so we'll manually roll the index
87+ -- back. It's assumed the read pointer will be at origin (nothing else
88+ -- would make sense).
89+ Nothing -> Index. rollbackward idx Origin
90+ -- First, block until the index is caught up to the tip ...
91+ trackReader idx rdr
92+ -- ... then attempt to stay in sync.
93+ outcome <- race (trackReaderBlocking idx rdr) act
94+ case outcome of
95+ Left impossible -> impossible
96+ Right t -> pure t
8797 where
88- acquireReader :: IO (Reader IO blk (Header blk ))
89- acquireReader = ChainDB. newHeaderReader cdb rr
90- releaseReader :: Reader IO blk (Header blk ) -> IO ()
91- releaseReader = ChainDB. readerClose
98+ acquireReader :: IO (Reader IO blk (Header blk ))
99+ acquireReader = ChainDB. newHeaderReader cdb rr
100+ releaseReader :: Reader IO blk (Header blk ) -> IO ()
101+ releaseReader = ChainDB. readerClose
102+
103+ checkpointsFold :: Index. Fold (Header blk ) [Point blk ]
104+ checkpointsFold = checkpointsFoldN 0 (checkpointOffsets k)
105+
106+ -- Count up from 0 on the first parameter. Whenever it coincides with the
107+ -- head of the second parameter (an increasing list) include that point.
108+ -- Stop when the second list is empty.
109+ -- Since checkpointsFold always includes the paramater k, the k'th entry
110+ -- in the index will always be in here, unless the index is shorter
111+ -- than k. This block is _at least_ k slots behind the DB, so if it's not
112+ -- in the DB then the index is way out of date.
113+ checkpointsFoldN
114+ :: Word64
115+ -> [Word64 ]
116+ -> Index. Fold (Header blk ) [Point blk ]
117+ checkpointsFoldN _ [] = Index. Stop []
118+ checkpointsFoldN w (o : os) = Index. More [] $ \ slotNo hash ->
119+ if w == o
120+ then fmap ((:) (Point (block slotNo hash))) (checkpointsFoldN (w+ 1 ) os)
121+ else checkpointsFoldN (w+ 1 ) (o : os)
0 commit comments