Skip to content

Commit

Permalink
Run downloading, unpacking, and saving in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Feb 26, 2025
1 parent 9ac7f3b commit e6b7466
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
2 changes: 2 additions & 0 deletions unison-cli/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ library:
- containers >= 0.6.3
- conduit
- conduit-extra
- stm-conduit
- stm-chans
- cryptonite
- either
- errors
Expand Down
32 changes: 27 additions & 5 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ where
import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
Expand All @@ -24,6 +25,7 @@ import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.TQueue qualified as TQueue
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
Expand Down Expand Up @@ -68,6 +70,7 @@ import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO
import UnliftIO.Async qualified as Async

type Stream i o = ConduitT i o StreamM ()

Expand Down Expand Up @@ -255,11 +258,30 @@ syncSortedStream shouldValidate codebase stream = do
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler = C.mapM_C \entityBatch -> do
validateAndSave shouldValidate codebase entityBatch
C.runConduit $
stream
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| handler
downloadQ <- liftIO $ STM.newTBMQueueIO 10 -- 10 batches, not 10 entities
unpackerQ <- liftIO $ STM.newTBMQueueIO 10
let downloaderSink = TQueue.sinkTBMQueue downloadQ
let downloaderSource = TQueue.sourceTBMQueue downloadQ
let unpackerSink = TQueue.sinkTBMQueue unpackerQ
let unpackedSource = TQueue.sourceTBMQueue unpackerQ
let downloadC =
stream
C..| CL.chunksOf batchSize
C..| downloaderSink
let saverC =
downloaderSource
C..| unpackChunks codebase
C..| unpackerSink
let handlerC =
unpackedSource
C..| handler

-- Run the three conduits concurrently, and wait for them all to finish, fail if any of them fail.
ExceptT . Async.runConc $ do
a <- Async.conc . runExceptT $ C.runConduit downloadC
b <- Async.conc . runExceptT $ C.runConduit saverC
c <- Async.conc . runExceptT $ C.runConduit handlerC
pure (a >> b >> c)

-- | Topologically sort entities based on their dependencies, returning a list in dependency-first order.
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
Expand Down
2 changes: 2 additions & 0 deletions unison-cli/unison-cli.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ library
, servant-client
, servant-conduit
, stm
, stm-chans
, stm-conduit
, temporary
, text
, text-ansi
Expand Down

0 comments on commit e6b7466

Please sign in to comment.