Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 67 additions & 32 deletions src/Ambar/Emulator/Connector/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ module Ambar.Emulator.Connector.Postgres

import Control.Concurrent.STM (STM, TVar, newTVarIO, readTVar)
import Control.Exception (bracket, throwIO, ErrorCall(..))
import Control.Monad (foldM)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as LB
import Data.Default (Default(..))
import Data.List ((\\))
import Data.List.Extra (chunksOf)
import Data.Map.Strict (Map)
import Data.Maybe (fromMaybe)
import Data.Maybe (fromMaybe, listToMaybe)
import qualified Data.Map.Strict as Map
import Data.String (fromString)
import Data.Time.LocalTime (localTimeToUTC, utc)
Expand Down Expand Up @@ -46,6 +48,8 @@ _POLLING_INTERVAL = millis 50
_MAX_TRANSACTION_TIME :: Duration
_MAX_TRANSACTION_TIME = seconds 120

_MAX_BOUNDARY_BATCH_SIZE :: Int
_MAX_BOUNDARY_BATCH_SIZE = 500

data PostgreSQL = PostgreSQL
{ c_host :: Text
Expand Down Expand Up @@ -154,41 +158,72 @@ connect config@PostgreSQL{..} logger (PostgreSQLState tracker) producer f =

parser = mkParser (columns config) schema

opts = P.FoldOptions
{ P.fetchQuantity = P.Automatic
, P.transactionMode = P.TransactionMode
run :: Boundaries -> Stream Record
run (Boundaries bs) acc0 emit =
P.withTransactionMode txMode conn $ do
let batches = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs
highest = fmap snd . listToMaybe . reverse
case reverse batches of
[] ->
runBatch Nothing Nothing [] acc0
(lastBatch : restReversed) -> do
let initBatches = reverse restReversed
(acc, mLastUpper) <-
foldM
(\(acc, mLower) batch -> do
let mUpper = highest batch
acc' <- runBatch mLower mUpper batch acc
return (acc', mUpper)
)
(acc0, Nothing)
initBatches
-- last batch has no upper bound
runBatch mLastUpper Nothing lastBatch acc
where
txMode = P.TransactionMode
{ P.isolationLevel = P.ReadCommitted
, P.readWriteMode = P.ReadOnly
}
}

run :: Boundaries -> Stream Record
run (Boundaries bs) acc0 emit = do
logDebug logger query
(acc, count) <- P.foldWithOptionsAndParser opts parser conn (fromString query) () (acc0, 0) $
\(acc, !count) record -> do
logResult record
acc' <- emit acc record
return (acc', succ count)
logDebug logger $ "results: " <> show @Int count
return acc
where
query = fromString $ Text.unpack $ renderPretty $ Pretty.fillSep
[ "SELECT" , commaSeparated $ map pretty $ columns config
, "FROM" , pretty c_table
, if null bs then "" else "WHERE" <> constraints
, "ORDER BY" , pretty c_serialColumn
]

constraints = sepBy "AND"
[ Pretty.fillSep
[ "("
, pretty c_serialColumn, "<", pretty low
, "OR"
, pretty high, "<", pretty c_serialColumn
, ")"]
| (EntryId low, EntryId high) <- bs
]
runBatch mLower mUpper batchBs acc = do
logDebug logger batchQuery
(acc', count) <- P.foldWith_ parser conn (fromString batchQuery) (acc, 0 :: Int)
(\(a, n) record -> do
logResult record
a' <- emit a record
return (a', n + 1)
)
logDebug logger $ "results: " <> show count
return acc'
where
batchQuery = fromString $ Text.unpack $ renderPretty $ Pretty.fillSep
[ "SELECT" , commaSeparated $ map pretty $ columns config
, "FROM" , pretty c_table
, if null allConstraints then "" else "WHERE" <+> sepBy "AND" allConstraints
, "ORDER BY" , pretty c_serialColumn
]

allConstraints = lowerBound ++ upperBound ++ exclusions

lowerBound = case mLower of
Nothing -> []
Just (EntryId low) ->
[Pretty.fillSep [pretty low, "<", pretty c_serialColumn]]

upperBound = case mUpper of
Nothing -> []
Just (EntryId high) ->
[Pretty.fillSep [pretty c_serialColumn, "<=", pretty high]]

exclusions =
[ Pretty.fillSep
[ "("
, pretty c_serialColumn, "<", pretty low
, "OR"
, pretty high, "<", pretty c_serialColumn
, ")"]
| (EntryId low, EntryId high) <- batchBs
]

logResult row =
logInfo logger $ renderPretty $
Expand Down
13 changes: 13 additions & 0 deletions tests/Test/Connector/PostgreSQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ testPostgreSQL p = do
describe "PostgreSQL" $ do
testGenericSQL with

it "handles large boundary lists" $ do
with (PartitionCount 1) $ \conn table topic connected -> do
void $ P.execute_ conn
(fromString $ "ALTER SEQUENCE " <> tableName table <> "_id_seq INCREMENT BY 2")
let n = 10000
insert conn table (take n $ head $ mocks table)
Comment thread
EthanRBrown marked this conversation as resolved.
connected $
deadline (seconds 10) $
Topic.withConsumer topic group $ \consumer -> do
forM_ [1..n] $ \_ -> void $ readEntry @Event consumer
insert conn table [head (mocks table !! 1)]
void $ readEntry @Event consumer

-- Test that column types are supported/unsupported by
-- creating database entries with the value and reporting
-- on the emulator's behaviour when trying to decode them.
Expand Down