diff --git a/src/Ambar/Emulator/Connector/Postgres.hs b/src/Ambar/Emulator/Connector/Postgres.hs index 4ff63e7..c451091 100644 --- a/src/Ambar/Emulator/Connector/Postgres.hs +++ b/src/Ambar/Emulator/Connector/Postgres.hs @@ -5,14 +5,16 @@ module Ambar.Emulator.Connector.Postgres import Control.Concurrent.STM (STM, TVar, newTVarIO, readTVar) import Control.Exception (bracket, throwIO, ErrorCall(..)) +import Control.Monad (foldM) import Data.Aeson (FromJSON, ToJSON) import qualified Data.Aeson as Aeson import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as LB import Data.Default (Default(..)) import Data.List ((\\)) +import Data.List.Extra (chunksOf) import Data.Map.Strict (Map) -import Data.Maybe (fromMaybe) +import Data.Maybe (fromMaybe, listToMaybe) import qualified Data.Map.Strict as Map import Data.String (fromString) import Data.Time.LocalTime (localTimeToUTC, utc) @@ -46,6 +48,8 @@ _POLLING_INTERVAL = millis 50 _MAX_TRANSACTION_TIME :: Duration _MAX_TRANSACTION_TIME = seconds 120 +_MAX_BOUNDARY_BATCH_SIZE :: Int +_MAX_BOUNDARY_BATCH_SIZE = 500 data PostgreSQL = PostgreSQL { c_host :: Text @@ -154,41 +158,72 @@ connect config@PostgreSQL{..} logger (PostgreSQLState tracker) producer f = parser = mkParser (columns config) schema - opts = P.FoldOptions - { P.fetchQuantity = P.Automatic - , P.transactionMode = P.TransactionMode + run :: Boundaries -> Stream Record + run (Boundaries bs) acc0 emit = + P.withTransactionMode txMode conn $ do + let batches = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs + highest = fmap snd . listToMaybe . reverse + case reverse batches of + [] -> + runBatch Nothing Nothing [] acc0 + (lastBatch : restReversed) -> do + let initBatches = reverse restReversed + (acc, mLastUpper) <- + foldM + (\(acc, mLower) batch -> do + let mUpper = highest batch + acc' <- runBatch mLower mUpper batch acc + return (acc', mUpper) + ) + (acc0, Nothing) + initBatches + -- last batch has no upper bound + runBatch mLastUpper Nothing lastBatch acc + where + txMode = P.TransactionMode { P.isolationLevel = P.ReadCommitted , P.readWriteMode = P.ReadOnly } - } - run :: Boundaries -> Stream Record - run (Boundaries bs) acc0 emit = do - logDebug logger query - (acc, count) <- P.foldWithOptionsAndParser opts parser conn (fromString query) () (acc0, 0) $ - \(acc, !count) record -> do - logResult record - acc' <- emit acc record - return (acc', succ count) - logDebug logger $ "results: " <> show @Int count - return acc - where - query = fromString $ Text.unpack $ renderPretty $ Pretty.fillSep - [ "SELECT" , commaSeparated $ map pretty $ columns config - , "FROM" , pretty c_table - , if null bs then "" else "WHERE" <> constraints - , "ORDER BY" , pretty c_serialColumn - ] - - constraints = sepBy "AND" - [ Pretty.fillSep - [ "(" - , pretty c_serialColumn, "<", pretty low - , "OR" - , pretty high, "<", pretty c_serialColumn - , ")"] - | (EntryId low, EntryId high) <- bs - ] + runBatch mLower mUpper batchBs acc = do + logDebug logger batchQuery + (acc', count) <- P.foldWith_ parser conn (fromString batchQuery) (acc, 0 :: Int) + (\(a, n) record -> do + logResult record + a' <- emit a record + return (a', n + 1) + ) + logDebug logger $ "results: " <> show count + return acc' + where + batchQuery = fromString $ Text.unpack $ renderPretty $ Pretty.fillSep + [ "SELECT" , commaSeparated $ map pretty $ columns config + , "FROM" , pretty c_table + , if null allConstraints then "" else "WHERE" <+> sepBy "AND" allConstraints + , "ORDER BY" , pretty c_serialColumn + ] + + allConstraints = lowerBound ++ upperBound ++ exclusions + + lowerBound = case mLower of + Nothing -> [] + Just (EntryId low) -> + [Pretty.fillSep [pretty low, "<", pretty c_serialColumn]] + + upperBound = case mUpper of + Nothing -> [] + Just (EntryId high) -> + [Pretty.fillSep [pretty c_serialColumn, "<=", pretty high]] + + exclusions = + [ Pretty.fillSep + [ "(" + , pretty c_serialColumn, "<", pretty low + , "OR" + , pretty high, "<", pretty c_serialColumn + , ")"] + | (EntryId low, EntryId high) <- batchBs + ] logResult row = logInfo logger $ renderPretty $ diff --git a/tests/Test/Connector/PostgreSQL.hs b/tests/Test/Connector/PostgreSQL.hs index d0050fc..6de6ca9 100644 --- a/tests/Test/Connector/PostgreSQL.hs +++ b/tests/Test/Connector/PostgreSQL.hs @@ -55,6 +55,19 @@ testPostgreSQL p = do describe "PostgreSQL" $ do testGenericSQL with + it "handles large boundary lists" $ do + with (PartitionCount 1) $ \conn table topic connected -> do + void $ P.execute_ conn + (fromString $ "ALTER SEQUENCE " <> tableName table <> "_id_seq INCREMENT BY 2") + let n = 10000 + insert conn table (take n $ head $ mocks table) + connected $ + deadline (seconds 10) $ + Topic.withConsumer topic group $ \consumer -> do + forM_ [1..n] $ \_ -> void $ readEntry @Event consumer + insert conn table [head (mocks table !! 1)] + void $ readEntry @Event consumer + -- Test that column types are supported/unsupported by -- creating database entries with the value and reporting -- on the emulator's behaviour when trying to decode them.