Skip to content

Commit

Permalink
Run downloading, unpacking, and saving in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Feb 26, 2025
1 parent 9ac7f3b commit 7ee97f1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
2 changes: 2 additions & 0 deletions unison-cli/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ library:
- containers >= 0.6.3
- conduit
- conduit-extra
- stm-conduit
- stm-chans
- cryptonite
- either
- errors
Expand Down
37 changes: 32 additions & 5 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ where
import Codec.Serialise qualified as CBOR
import Conduit (ConduitT)
import Conduit qualified as C
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
Expand All @@ -24,6 +25,7 @@ import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.TQueue qualified as TQueue
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
Expand Down Expand Up @@ -68,6 +70,8 @@ import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
import UnliftIO qualified as IO
import UnliftIO.Async qualified as Async
import qualified UnliftIO.STM as STM

type Stream i o = ConduitT i o StreamM ()

Expand Down Expand Up @@ -255,11 +259,34 @@ syncSortedStream shouldValidate codebase stream = do
let handler :: Stream (Vector (Hash32, TempEntity)) o
handler = C.mapM_C \entityBatch -> do
validateAndSave shouldValidate codebase entityBatch
C.runConduit $
stream
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| handler
downloadQ <- liftIO $ STM.newTBMQueueIO 10 -- 10 batches, not 10 entities
unpackerQ <- liftIO $ STM.newTBMQueueIO 10
let downloaderSink = TQueue.sinkTBMQueue downloadQ
let downloaderSource = TQueue.sourceTBMQueue downloadQ
let unpackerSink = TQueue.sinkTBMQueue unpackerQ
let unpackedSource = TQueue.sourceTBMQueue unpackerQ
let downloadC =
stream
C..| CL.chunksOf batchSize
C..| downloaderSink
let saverC =
downloaderSource
C..| unpackChunks codebase
C..| unpackerSink
let handlerC =
unpackedSource
C..| handler

-- Run the three conduits concurrently, and wait for them all to finish, fail if any of them fail.
ExceptT . Async.runConc $ do
a <- Async.conc . runExceptT $ do
C.runConduit downloadC
STM.atomically $ STM.closeTBMQueue downloadQ
b <- Async.conc . runExceptT $ do
C.runConduit saverC
STM.atomically $ STM.closeTBMQueue unpackerQ
c <- Async.conc . runExceptT $ C.runConduit handlerC
pure (a >> b >> c)

-- | Topologically sort entities based on their dependencies, returning a list in dependency-first order.
sortDependencyFirst :: (Foldable f, Functor f) => f (Hash32, TempEntity) -> [(Hash32, TempEntity)]
Expand Down
2 changes: 2 additions & 0 deletions unison-cli/unison-cli.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ library
, servant-client
, servant-conduit
, stm
, stm-chans
, stm-conduit
, temporary
, text
, text-ansi
Expand Down

0 comments on commit 7ee97f1

Please sign in to comment.