@@ -5,14 +5,18 @@ module Ouroboros.Byron.Proxy.Index.ChainDB
55 ( trackChainDB
66 ) where
77
8+ import Control.Concurrent.Async (race )
89import Control.Exception (bracket )
10+ import Data.Word (Word64 )
911
12+ import Ouroboros.Byron.Proxy.Block (checkpointOffsets )
1013import Ouroboros.Byron.Proxy.Index.Types (Index )
1114import qualified Ouroboros.Byron.Proxy.Index.Types as Index
1215import Ouroboros.Consensus.Block (GetHeader (Header ))
1316import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry )
17+ import Ouroboros.Consensus.Protocol.Abstract (SecurityParam )
1418import Ouroboros.Network.Block (ChainUpdate (.. ), Point (.. ))
15- import Ouroboros.Network.Point (WithOrigin (Origin ))
19+ import Ouroboros.Network.Point (WithOrigin (Origin ), block )
1620import Ouroboros.Storage.ChainDB.API (ChainDB , Reader )
1721import qualified Ouroboros.Storage.ChainDB.API as ChainDB
1822
@@ -21,7 +25,7 @@ trackReaderBlocking
2125 :: ( Monad m )
2226 => Index m (Header blk )
2327 -> Reader m blk (Header blk )
24- -> m x
28+ -> m void
2529trackReaderBlocking idx reader = do
2630 instruction <- ChainDB. readerInstructionBlocking reader
2731 case instruction of
@@ -50,42 +54,69 @@ trackReader idx reader = do
5054 trackReader idx reader
5155 Nothing -> pure ()
5256
53- -- | Have an Index track a ChainDB using its Reader API. You probably want to
54- -- race this with some other thread that runs your application.
55- --
56- -- If the ChainDB does not contain the tip of the Index, then the whole index
57- -- will be rebuilt.
57+ -- | Have an Index track a ChainDB using its Reader API for the duration of
58+ -- some monadic action.
5859--
5960-- It will spawn a thread to do the index updates. This must be the only
6061-- index writer. It is run by `race` with the action, so exceptions in either
6162-- the action or the writer thread will be re-thrown here.
6263--
63- -- If the tip of the index is not in the ChainDB, then the entire index will be
64- -- rebuilt. This is not ideal: there may be an intersection. TODO would be
65- -- better to check the newest slot older than `k` back from tip of index, and
66- -- go from there.
64+ -- If the tip of the index is in the ChainDB, then no work must be done in the
65+ -- beginning. But if it's not in the ChainDB, there will have to be a rollback
66+ -- on the index. The SecurityParam k is used to decide how far back to try. If
67+ -- Only index entries at most k slots old will be checked against the
68+ -- ChainDB. If none are in it, then the entire index will be rebuild (rollback
69+ -- to Origin).
6770trackChainDB
68- :: forall blk void .
71+ :: forall blk t .
6972 ResourceRegistry IO
7073 -> Index IO (Header blk )
7174 -> ChainDB IO blk
72- -> IO void
73- trackChainDB rr idx cdb = bracket acquireReader releaseReader $ \ rdr -> do
74- tipPoint <- Index. tip idx
75- mPoint <- ChainDB. readerForward rdr [Point tipPoint]
76- -- `readerForward` docs say that if we get `Nothing`, the next reader
77- -- instruction may not be a rollback, so we'll manually roll the index
78- -- back. It's assumed the read pointer will be at origin (nothing else
79- -- would make sense).
80- case mPoint of
81- Nothing -> Index. rollbackward idx Origin
82- Just _ -> pure ()
83- -- First, block until the index is caught up to the tip ...
84- trackReader idx rdr
85- -- ... then attempt to stay in sync.
86- trackReaderBlocking idx rdr
75+ -> SecurityParam
76+ -> IO t
77+ -> IO t
78+ trackChainDB rr idx cdb k act = bracket acquireReader releaseReader $ \ rdr -> do
79+ checkpoints <- Index. streamFromTip idx checkpointsFold
80+ mPoint <- ChainDB. readerForward rdr checkpoints
81+ case mPoint of
82+ -- `readerForward` docs say that the next instruction will be a rollback,
83+ -- so we don't have to do anything here; the call to `trackReader` will
84+ -- do what needs to be done.
85+ Just _ -> pure ()
86+ -- `readerForward` docs say that if we get `Nothing`, the next reader
87+ -- instruction may not be a rollback, so we'll manually roll the index
88+ -- back. It's assumed the read pointer will be at origin (nothing else
89+ -- would make sense).
90+ Nothing -> Index. rollbackward idx Origin
91+ -- First, block until the index is caught up to the tip ...
92+ trackReader idx rdr
93+ -- ... then attempt to stay in sync.
94+ outcome <- race (trackReaderBlocking idx rdr) act
95+ case outcome of
96+ Left impossible -> impossible
97+ Right t -> pure t
8798 where
8899 acquireReader :: IO (Reader IO blk (Header blk ))
89100 acquireReader = ChainDB. deserialiseReader <$> ChainDB. newHeaderReader cdb rr
90101 releaseReader :: Reader IO blk (Header blk ) -> IO ()
91102 releaseReader = ChainDB. readerClose
103+
104+ checkpointsFold :: Index. Fold (Header blk ) [Point blk ]
105+ checkpointsFold = checkpointsFoldN 0 (checkpointOffsets k)
106+
107+ -- Count up from 0 on the first parameter. Whenever it coincides with the
108+ -- head of the second parameter (an increasing list) include that point.
109+ -- Stop when the second list is empty.
110+ -- Since checkpointsFold always includes the paramater k, the k'th entry
111+ -- in the index will always be in here, unless the index is shorter
112+ -- than k. This block is _at least_ k slots behind the DB, so if it's not
113+ -- in the DB then the index is way out of date.
114+ checkpointsFoldN
115+ :: Word64
116+ -> [Word64 ]
117+ -> Index. Fold (Header blk ) [Point blk ]
118+ checkpointsFoldN _ [] = Index. Stop []
119+ checkpointsFoldN w (o : os) = Index. More [] $ \ slotNo hash ->
120+ if w == o
121+ then fmap ((:) (Point (block slotNo hash))) (checkpointsFoldN (w+ 1 ) os)
122+ else checkpointsFoldN (w+ 1 ) (o : os)
0 commit comments