Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<!--
A new scriv changelog fragment.

Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

### Patch

- Make forker tracers more informative, with enclosing times.

<!--
### Non-Breaking

- A bullet item for the Non-Breaking category.

-->
<!--
### Breaking

- A bullet item for the Breaking category.

-->
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.Forker
-- ** Tracing
, TraceForkerEvent (..)
, TraceForkerEventWithKey (..)
, ForkerWasCommitted (..)

-- * Validation
, AnnLedgerError (..)
Expand Down Expand Up @@ -81,6 +82,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache
import Ouroboros.Consensus.Util.CallStack
import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -663,14 +665,14 @@ data TraceForkerEventWithKey

data TraceForkerEvent
= ForkerOpen
| ForkerCloseUncommitted
| ForkerCloseCommitted
| ForkerReadTablesStart
| ForkerReadTablesEnd
| ForkerRangeReadTablesStart
| ForkerRangeReadTablesEnd
| ForkerReadTables EnclosingTimed
| ForkerRangeReadTables EnclosingTimed
| ForkerReadStatistics
| ForkerPushStart
| ForkerPushEnd
| DanglingForkerClosed
| ForkerPush EnclosingTimed
| ForkerClose ForkerWasCommitted
deriving (Show, Eq)

data ForkerWasCommitted
= ForkerWasCommitted
| ForkerWasUncommitted
deriving (Eq, Show)
Original file line number Diff line number Diff line change
Expand Up @@ -852,13 +852,15 @@ newForker h ldbEnv (rk, releaseVar) rr dblog =
dblogVar <- newTVarIO dblog
forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \r -> (r, r + 1)
forkerMVar <- newMVar $ Left (ldbLock ldbEnv, ldbBackingStore ldbEnv, rr)
forkerCommitted <- newTVarIO False
let forkerEnv =
ForkerEnv
{ foeBackingStoreValueHandle = forkerMVar
, foeChangelog = dblogVar
, foeSwitchVar = ldbChangelog ldbEnv
, foeTracer =
LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
, foeWasCommitted = forkerCommitted
}
atomically $ do
-- Note that we add the forkerEnv to the 'ldbForkers' so that an exception
Expand Down Expand Up @@ -921,5 +923,7 @@ implForkerClose (LDBHandle varState) forkerKey env = do
(\m -> Map.updateLookupWithKey (\_ _ -> Nothing) forkerKey m)
case frk of
Nothing -> pure ()
Just e -> traceWith (foeTracer e) DanglingForkerClosed
Just e -> do
wc <- readTVarIO (foeWasCommitted e)
traceWith (foeTracer e) (ForkerClose $ if wc then ForkerWasCommitted else ForkerWasUncommitted)
closeForkerEnv env
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V1.Forker
import qualified Control.Monad as Monad
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Contravariant ((>$<))
import qualified Data.Map.Strict as Map
import Data.Semigroup
import qualified Data.Set as Set
Expand All @@ -43,6 +44,7 @@ import Ouroboros.Consensus.Storage.LedgerDB.V1.DiffSeq
)
import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.DiffSeq as DS
import Ouroboros.Consensus.Storage.LedgerDB.V1.Lock
import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike
import qualified Ouroboros.Network.AnchoredSeq as AS

Expand Down Expand Up @@ -72,6 +74,7 @@ data ForkerEnv m l blk = ForkerEnv
-- flushed, but 'forkerCommit' will take care of this.
, foeTracer :: !(Tracer m TraceForkerEvent)
-- ^ Config
, foeWasCommitted :: !(StrictTVar m Bool)
}
deriving Generic

Expand Down Expand Up @@ -132,53 +135,50 @@ implForkerReadTables ::
ForkerEnv m l blk ->
LedgerTables l KeysMK ->
m (LedgerTables l ValuesMK)
implForkerReadTables env ks = do
traceWith (foeTracer env) ForkerReadTablesStart
chlog <- readTVarIO (foeChangelog env)
bsvh <- getValueHandle env
unfwd <- readKeySetsWith bsvh (changelogLastFlushedState chlog) ks
case forwardTableKeySets chlog unfwd of
Left _err -> error "impossible!"
Right vs -> do
traceWith (foeTracer env) ForkerReadTablesEnd
pure vs
implForkerReadTables env ks =
encloseTimedWith (ForkerReadTables >$< foeTracer env) $ do
chlog <- readTVarIO (foeChangelog env)
bsvh <- getValueHandle env
unfwd <- readKeySetsWith bsvh (changelogLastFlushedState chlog) ks
case forwardTableKeySets chlog unfwd of
Left _err -> error "impossible!"
Right vs -> pure vs

implForkerRangeReadTables ::
(IOLike m, GetTip l, HasLedgerTables l) =>
QueryBatchSize ->
ForkerEnv m l blk ->
RangeQueryPrevious l ->
m (LedgerTables l ValuesMK, Maybe (TxIn l))
implForkerRangeReadTables qbs env rq0 = do
traceWith (foeTracer env) ForkerRangeReadTablesStart
ldb <- readTVarIO $ foeChangelog env
let
-- Get the differences without the keys that are greater or equal
-- than the maximum previously seen key.
diffs =
maybe
id
(ltliftA2 doDropLTE)
(BackingStore.rqPrev rq)
$ ltmap prj
$ changelogDiffs ldb
-- (1) Ensure that we never delete everything read from disk (ie if
-- our result is non-empty then it contains something read from
-- disk, as we only get an empty result if we reached the end of
-- the table).
--
-- (2) Also, read one additional key, which we will not include in
-- the result but need in order to know which in-memory
-- insertions to include.
maxDeletes = ltcollapse $ ltmap (K2 . numDeletesDiffMK) diffs
nrequested = 1 + max (BackingStore.rqCount rq) (1 + maxDeletes)
implForkerRangeReadTables qbs env rq0 =
encloseTimedWith (ForkerRangeReadTables >$< foeTracer env) $ do
ldb <- readTVarIO $ foeChangelog env
let
-- Get the differences without the keys that are greater or equal
-- than the maximum previously seen key.
diffs =
maybe
id
(ltliftA2 doDropLTE)
(BackingStore.rqPrev rq)
$ ltmap prj
$ changelogDiffs ldb
-- (1) Ensure that we never delete everything read from disk (ie if
-- our result is non-empty then it contains something read from
-- disk, as we only get an empty result if we reached the end of
-- the table).
--
-- (2) Also, read one additional key, which we will not include in
-- the result but need in order to know which in-memory
-- insertions to include.
maxDeletes = ltcollapse $ ltmap (K2 . numDeletesDiffMK) diffs
nrequested = 1 + max (BackingStore.rqCount rq) (1 + maxDeletes)

let st = changelogLastFlushedState ldb
bsvh <- getValueHandle env
(values, mx) <- BackingStore.bsvhRangeRead bsvh st (rq{BackingStore.rqCount = nrequested})
traceWith (foeTracer env) ForkerRangeReadTablesEnd
let res = ltliftA2 (doFixupReadResult nrequested) diffs values
pure (res, mx)
let st = changelogLastFlushedState ldb
bsvh <- getValueHandle env
(values, mx) <- BackingStore.bsvhRangeRead bsvh st (rq{BackingStore.rqCount = nrequested})
let res = ltliftA2 (doFixupReadResult nrequested) diffs values
pure (res, mx)
where
rq = BackingStore.RangeQuery rq1 (fromIntegral $ defaultQueryBatchSize qbs)

Expand Down Expand Up @@ -309,17 +309,16 @@ implForkerReadStatistics env = do
}

implForkerPush ::
(MonadSTM m, GetTip l, HasLedgerTables l) =>
(IOLike m, GetTip l, HasLedgerTables l) =>
ForkerEnv m l blk ->
l DiffMK ->
m ()
implForkerPush env newState = do
traceWith (foeTracer env) ForkerPushStart
atomically $ do
chlog <- readTVar (foeChangelog env)
let chlog' = extend newState chlog
writeTVar (foeChangelog env) chlog'
traceWith (foeTracer env) ForkerPushEnd
implForkerPush env newState =
encloseTimedWith (ForkerPush >$< foeTracer env) $ do
atomically $ do
chlog <- readTVar (foeChangelog env)
let chlog' = extend newState chlog
writeTVar (foeChangelog env) chlog'

implForkerCommit ::
(MonadSTM m, GetTip l, StandardHash l, HasLedgerTables l) =>
Expand Down Expand Up @@ -350,6 +349,7 @@ implForkerCommit env = do
, changelogDiffs =
ltliftA2 (doPrune s) (changelogDiffs orig) (changelogDiffs dblog)
}
Monad.void $ swapTVar (foeWasCommitted env) True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use writeTVar here, as in the corresponding place in the V2 implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point. Will change it

where
-- Prune the diffs from the forker's log that have already been flushed to
-- disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,9 @@ implForkerClose (LDBHandle varState) forkerKey forkerEnv = do

case frk of
Nothing -> pure ()
Just e -> traceWith (foeTracer e) DanglingForkerClosed
Just e -> do
wc <- readTVarIO (foeWasCommitted e)
traceWith (foeTracer e) (ForkerClose $ if wc then ForkerWasCommitted else ForkerWasUncommitted)

closeForkerEnv forkerEnv

Expand All @@ -757,6 +759,7 @@ newForker h ldbEnv rr (rk, st) = do
traceWith tr ForkerOpen
lseqVar <- newTVarIO . LedgerSeq . AS.Empty $ st
foeCleanup <- newTVarIO $ pure ()
forkerCommitted <- newTVarIO False
let forkerEnv =
ForkerEnv
{ foeLedgerSeq = lseqVar
Expand All @@ -768,6 +771,7 @@ newForker h ldbEnv rr (rk, st) = do
, foeCleanup
, foeLedgerDbLock = ldbOpenHandlesLock ldbEnv
, foeLedgerDbToClose = ldbToClose ldbEnv
, foeWasCommitted = forkerCommitted
}
atomically $ modifyTVar (ldbForkers ldbEnv) $ Map.insert forkerKey forkerEnv
pure $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.Forker
import Control.RAWLock (RAWLock)
import Control.ResourceRegistry
import Control.Tracer
import Data.Functor.Contravariant ((>$<))
import Data.Maybe (fromMaybe)
import GHC.Generics
import NoThunks.Class
Expand All @@ -35,6 +36,7 @@ import Ouroboros.Consensus.Storage.LedgerDB.Forker
import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq
import Ouroboros.Consensus.Util (whenJust)
import Ouroboros.Consensus.Util.CallStack
import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.NormalForm.StrictTVar ()
import qualified Ouroboros.Network.AnchoredSeq as AS
Expand Down Expand Up @@ -66,6 +68,7 @@ data ForkerEnv m l blk = ForkerEnv
-- LedgerDB and release the discarded ones.
, foeLedgerDbLock :: !(RAWLock m ())
-- ^ 'ldbOpenHandlesLock'.
, foeWasCommitted :: !(StrictTVar m Bool)
}
deriving Generic

Expand All @@ -79,36 +82,32 @@ deriving instance
NoThunks (ForkerEnv m l blk)

implForkerReadTables ::
(MonadSTM m, GetTip l) =>
(IOLike m, GetTip l) =>
ForkerEnv m l blk ->
LedgerTables l KeysMK ->
m (LedgerTables l ValuesMK)
implForkerReadTables env ks = do
traceWith (foeTracer env) ForkerReadTablesStart
lseq <- readTVarIO (foeLedgerSeq env)
let stateRef = currentHandle lseq
tbs <- read (tables stateRef) (state stateRef) ks
traceWith (foeTracer env) ForkerReadTablesEnd
pure tbs
implForkerReadTables env ks =
encloseTimedWith (ForkerReadTables >$< foeTracer env) $ do
lseq <- readTVarIO (foeLedgerSeq env)
let stateRef = currentHandle lseq
read (tables stateRef) (state stateRef) ks

implForkerRangeReadTables ::
(MonadSTM m, GetTip l, HasLedgerTables l) =>
(IOLike m, GetTip l, HasLedgerTables l) =>
QueryBatchSize ->
ForkerEnv m l blk ->
RangeQueryPrevious l ->
m (LedgerTables l ValuesMK, Maybe (TxIn l))
implForkerRangeReadTables qbs env rq0 = do
traceWith (foeTracer env) ForkerRangeReadTablesStart
ldb <- readTVarIO $ foeLedgerSeq env
let n = fromIntegral $ defaultQueryBatchSize qbs
stateRef = currentHandle ldb
case rq0 of
NoPreviousQuery -> readRange (tables stateRef) (state stateRef) (Nothing, n)
PreviousQueryWasFinal -> pure (LedgerTables emptyMK, Nothing)
PreviousQueryWasUpTo k -> do
tbs <- readRange (tables stateRef) (state stateRef) (Just k, n)
traceWith (foeTracer env) ForkerRangeReadTablesEnd
pure tbs
implForkerRangeReadTables qbs env rq0 =
encloseTimedWith (ForkerRangeReadTables >$< foeTracer env) $ do
ldb <- readTVarIO $ foeLedgerSeq env
let n = fromIntegral $ defaultQueryBatchSize qbs
stateRef = currentHandle ldb
case rq0 of
NoPreviousQuery -> readRange (tables stateRef) (state stateRef) (Nothing, n)
PreviousQueryWasFinal -> pure (LedgerTables emptyMK, Nothing)
PreviousQueryWasUpTo k ->
readRange (tables stateRef) (state stateRef) (Just k, n)

implForkerGetLedgerState ::
(MonadSTM m, GetTip l) =>
Expand All @@ -129,24 +128,23 @@ implForkerPush ::
ForkerEnv m l blk ->
l DiffMK ->
m ()
implForkerPush env newState = do
traceWith (foeTracer env) ForkerPushStart
lseq <- readTVarIO (foeLedgerSeq env)
implForkerPush env newState =
encloseTimedWith (ForkerPush >$< foeTracer env) $ do
lseq <- readTVarIO (foeLedgerSeq env)

let st0 = current lseq
st = forgetLedgerTables newState
let st0 = current lseq
st = forgetLedgerTables newState

bracketOnError
(duplicate (tables $ currentHandle lseq) (foeResourceRegistry env))
(release . fst)
( \(_, newtbs) -> do
pushDiffs newtbs st0 newState
bracketOnError
(duplicate (tables $ currentHandle lseq) (foeResourceRegistry env))
(release . fst)
( \(_, newtbs) -> do
pushDiffs newtbs st0 newState

let lseq' = extend (StateRef st newtbs) lseq
let lseq' = extend (StateRef st newtbs) lseq

traceWith (foeTracer env) ForkerPushEnd
atomically $ writeTVar (foeLedgerSeq env) lseq'
)
atomically $ writeTVar (foeLedgerSeq env) lseq'
)

implForkerCommit ::
(IOLike m, GetTip l, StandardHash l) =>
Expand Down Expand Up @@ -183,6 +181,7 @@ implForkerCommit env = do
)
whenJust ldbToClose (modifyTVar foeLedgerDbToClose . (:))
writeTVar foeCleanup transfer
writeTVar foeWasCommitted True
where
ForkerEnv
{ foeLedgerSeq
Expand All @@ -191,6 +190,7 @@ implForkerCommit env = do
, foeLedgerDbRegistry
, foeCleanup
, foeLedgerDbToClose
, foeWasCommitted
} = env

theImpossible =
Expand Down
Loading