Skip to content

Commit

Permalink
Add whimsy to progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Feb 26, 2025
1 parent 13e52f8 commit 3b3d436
Showing 1 changed file with 89 additions and 49 deletions.
138 changes: 89 additions & 49 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ where
import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Category ((<<<))
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Lens
import Control.Monad.Except
Expand All @@ -31,6 +32,7 @@ import Data.Graph qualified as Graph
import Data.Map qualified as Map
import Data.Proxy
import Data.Set qualified as Set
import Data.Text qualified as Text
import Data.Text.IO qualified as Text
import Data.Text.Lazy qualified as Text.Lazy
import Data.Text.Lazy.Encoding qualified as Text.Lazy
Expand Down Expand Up @@ -66,6 +68,7 @@ import Unison.SyncV2.API (Routes (downloadEntitiesStream))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (CBORBytes, CBORStream, DependencyType (..))
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Monoid qualified as Monoid
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO
Expand Down Expand Up @@ -225,23 +228,26 @@ batchValidateEntities entities = do
syncUnsortedStream ::
Bool ->
(Codebase.Codebase IO v a) ->
(Maybe Word64) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncUnsortedStream shouldValidate codebase stream = do
allEntities <-
C.runConduit $
stream
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| validateBatch
C..| C.concat
C..| C.sinkVector @Vector
let sortedEntities = sortDependencyFirst allEntities
liftIO $ withEntitySavingCallback (Just $ Vector.length allEntities) \countC -> do
Codebase.runTransaction codebase $ for_ sortedEntities \(hash, entity) -> do
r <- Q.saveTempEntityInMain v2HashHandle hash entity
Sqlite.unsafeIO $ countC 1
pure r
syncUnsortedStream shouldValidate codebase numEntities stream = ExceptT $ do
withStreamProgressCallback (fromIntegral <$> numEntities) \countC -> runExceptT do
allEntities <-
C.runConduit $
stream
C..| countC
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| validateBatch
C..| C.concat
C..| C.sinkVector @Vector
let sortedEntities = sortDependencyFirst allEntities
liftIO $ withEntitySavingCallback (Just $ Vector.length allEntities) \countC -> do
Codebase.runTransaction codebase $ for_ sortedEntities \(hash, entity) -> do
r <- Q.saveTempEntityInMain v2HashHandle hash entity
Sqlite.unsafeIO $ countC 1
pure r
where
validateBatch :: Stream (Vector (Hash32, TempEntity)) (Vector (Hash32, TempEntity))
validateBatch = C.iterM \entities -> do
Expand All @@ -252,32 +258,37 @@ syncUnsortedStream shouldValidate codebase stream = do
syncSortedStream ::
Bool ->
(Codebase.Codebase IO v a) ->
(Maybe Word64) ->
Stream () SyncV2.EntityChunk ->
StreamM ()
syncSortedStream shouldValidate codebase stream = do
(downloaderSink, downloaderSource) <- parallelSinkAndSource 10
(unpackerSink, unpackerSource) <- parallelSinkAndSource 10
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler = C.mapM_C \entityBatch -> do
validateAndSave shouldValidate codebase entityBatch
let downloadC =
stream
C..| CL.chunksOf batchSize
C..| downloaderSink
let saverC =
downloaderSource
C..| unpackChunks codebase
C..| unpackerSink
let handlerC =
unpackerSource
C..| handler

-- Run the three conduits concurrently, and wait for them all to finish, fail if any of them fail.
ExceptT . Async.runConc $ do
a <- Async.conc . runExceptT $ C.runConduit downloadC
b <- Async.conc . runExceptT $ C.runConduit saverC
c <- Async.conc . runExceptT $ C.runConduit handlerC
pure (a >> b >> c)
syncSortedStream shouldValidate codebase numEntities stream = ExceptT do
withSortedStreamProgress (fromIntegral <$> numEntities) \(downloadCount, doneDownloading, unpackCount, doneUnpacking, saveCount) -> runExceptT do
(downloaderSink, downloaderSource) <- parallelSinkAndSource 10
(unpackerSink, unpackerSource) <- parallelSinkAndSource 10
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler = C.mapM_C \entityBatch -> do
validateAndSave shouldValidate codebase entityBatch
saveCount (length entityBatch)
let downloadC =
stream
C..| CL.chunksOf batchSize
C..| C.iterM (downloadCount <<< length)
C..| (downloaderSink *> lift doneDownloading)
let saverC =
downloaderSource
C..| unpackChunks codebase
C..| C.iterM (unpackCount <<< length)
C..| (unpackerSink *> lift doneUnpacking)
let handlerC =
unpackerSource
C..| handler

-- Run the three conduits concurrently, and wait for them all to finish, fail if any of them fail.
ExceptT . Async.runConc $ do
a <- Async.conc . runExceptT $ C.runConduit downloadC
b <- Async.conc . runExceptT $ C.runConduit saverC
c <- Async.conc . runExceptT $ C.runConduit handlerC
pure (a >> b >> c)

-- | Topologically sort entities based on their dependencies, returning a list in dependency-first order.
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
Expand Down Expand Up @@ -316,17 +327,14 @@ streamIntoCodebase ::
SyncV2.StreamInitInfo ->
Stream () SyncV2.EntityChunk ->
StreamM ()
streamIntoCodebase shouldValidate codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = numEntities} stream = ExceptT do
withStreamProgressCallback (fromIntegral <$> numEntities) \countC -> runExceptT do
-- Add a counter to the stream to track how many entities we've processed.
let stream' = stream C..| countC
case version of
(SyncV2.Version 1) -> pure ()
v -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorUnsupportedVersion v
streamIntoCodebase shouldValidate codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = numEntities} stream = do
case version of
(SyncV2.Version 1) -> pure ()
v -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorUnsupportedVersion v

case entitySorting of
SyncV2.DependenciesFirst -> syncSortedStream shouldValidate codebase stream'
SyncV2.Unsorted -> syncUnsortedStream shouldValidate codebase stream'
case entitySorting of
SyncV2.DependenciesFirst -> syncSortedStream shouldValidate codebase numEntities stream
SyncV2.Unsorted -> syncUnsortedStream shouldValidate codebase numEntities stream

-- | A sanity-check to verify that the hash we expected to import from the stream was successfully loaded into the codebase.
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT (SyncError SyncV2.PullError) IO ()
Expand Down Expand Up @@ -661,6 +669,38 @@ withEntityLoadingCallback action = do
let msg n = "\n 📦 Unpacked " <> tShow n <> " entities...\n\n"
counterProgress msg action

withSortedStreamProgress :: (MonadIO m, MonadUnliftIO n) => Maybe Int -> ((Int -> m (), m (), Int -> m (), m (), Int -> m ()) -> n a) -> n a
withSortedStreamProgress total action = do
downloadedVar <- IO.newTVarIO (0 :: Int)
doneDownloadingVar <- IO.newTVarIO False
unpackedVar <- IO.newTVarIO (0 :: Int)
doneUnpackingVar <- IO.newTVarIO False
savedVar <- IO.newTVarIO (0 :: Int)
IO.withRunInIO \toIO -> do
Console.Regions.displayConsoleRegions do
Console.Regions.withConsoleRegion Console.Regions.Linear \region -> do
Console.Regions.setConsoleRegion region do
downloaded <- IO.readTVar downloadedVar
doneDownloading <- IO.readTVar doneDownloadingVar
unpacked <- IO.readTVar unpackedVar
doneUnpacking <- IO.readTVar doneUnpackingVar
saved <- IO.readTVar savedVar
pure $
Text.unlines
[ "",
" ⬇️ Downloaded: " <> tShow downloaded <> maybe "" (\total -> " / " <> tShow total) total <> Monoid.whenM doneDownloading " 🏁",
" 📦 Unpacked: " <> tShow unpacked <> Monoid.whenM doneUnpacking " 🏁",
" 💾 Saved: " <> tShow saved
]
toIO $
action $
( \i -> do liftIO $ IO.atomically (IO.modifyTVar' downloadedVar (+ i)),
do liftIO $ IO.atomically (IO.writeTVar doneDownloadingVar True),
\i -> do liftIO $ IO.atomically (IO.modifyTVar' unpackedVar (+ i)),
do liftIO $ IO.atomically (IO.writeTVar doneUnpackingVar True),
\i -> do liftIO $ IO.atomically (IO.modifyTVar' savedVar (+ i))
)

-- * Conduit helpers

parallelSinkAndSource :: (MonadIO m) => Int -> m (ConduitT i void1 m (), ConduitT void2 i m ())
Expand Down

0 comments on commit 3b3d436

Please sign in to comment.