Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/Ambar/Emulator/Queue/Partition/File.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import qualified GHC.IO.Device as FD
import System.IO
( Handle
, hSeek
, hSetFileSize
, withFile
, openFile
, hClose
Expand Down Expand Up @@ -115,13 +116,15 @@ data OpenError
= AlreadyOpen FilePath
| MissingRecords FilePath
| MissingIndex FilePath
| IndexAheadOfRecords FilePath
deriving Show

instance Exception OpenError where
displayException = \case
AlreadyOpen path -> "Lock found. Partition already open: " <> path
MissingRecords path -> "Missing records file: " <> path
MissingIndex path -> "Missing index file: " <> path
IndexAheadOfRecords path -> "Records file is smaller than its index claims: " <> path

open :: HasCallStack => FilePath -> String -> IO FilePartition
open location name = do
Expand All @@ -132,7 +135,7 @@ open location name = do
(count, size) <-
if not exists
then createIndex index
else loadIndex index
else loadIndex records index

writeFile lock "locked"
fd_records <- openNonLockingWritableFD records
Expand Down Expand Up @@ -165,16 +168,22 @@ open location name = do
LB.writeFile index $ Binary.encode @Word64 0
return (Count 0, Bytes 0)

loadIndex index = do
loadIndex records index = do
indexSize <- fromIntegral <$> getFileSize index
let count = (indexSize `div` _WORD64_SIZE) - 1
lastIndexEntry <- readIndexEntry index (Offset count)
-- This should be the same as the size of the records file.
-- However we use the last index entry in case the program
-- was interrupted between writing to the records file and
-- to the index file.
let byteOffsetOfNextEntry = lastIndexEntry
return (Count count, byteOffsetOfNextEntry)
recordsSize <- getFileSize records
let Bytes lastEntry = lastIndexEntry
when (recordsSize < fromIntegral lastEntry) $
throwIO $ IndexAheadOfRecords records
-- If the records file is larger than the last index entry, a record
-- was appended but the program was interrupted before its index entry
-- was written. Discard the orphan bytes so the records file end matches
-- the index, keeping subsequent appends consistent.
when (recordsSize > fromIntegral lastEntry) $
withFile records ReadWriteMode $ \h ->
hSetFileSize h (fromIntegral lastEntry)
return (Count count, lastIndexEntry)

close :: HasCallStack => FilePartition -> IO ()
close FilePartition{..} =
Expand Down
51 changes: 51 additions & 0 deletions tests/Test/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import qualified Data.Text as Text
import Data.Foldable (traverse_)
import qualified Data.Text.Encoding as Text
import Data.Typeable (Typeable)
import System.Directory (getFileSize)
import System.FilePath ((</>))
import System.IO (IOMode(..), hSetFileSize, withFile)
import System.IO.Temp (withSystemTempDirectory)
import Test.Hspec
( Spec
Expand Down Expand Up @@ -55,6 +58,8 @@ testQueues = do
testQueue
describe "partition" $ do
testPartition withFilePartition
describe "file partition" $ do
testFilePartition
describe "topic" $ do
testTopic withFileTopic
where
Expand Down Expand Up @@ -340,6 +345,52 @@ instance Partition FailPartition where
read _ = throwIO FailPartition
write _ _ = return ()

testFilePartition :: Spec
testFilePartition = do
it "fails to open when records file is smaller than last index entry" $
withTempPath $ \path -> do
let name = "file-partition"
records = path </> name <> ".records"

FilePartition.withFilePartition path name $ \partition ->
traverse_ (P.write partition) (take 5 messages)

size <- getFileSize records
withFile records ReadWriteMode $ \h ->
hSetFileSize h (size - 1)

let isInconsistent e
| Just (FilePartition.IndexAheadOfRecords _) <- fromException e = True
| otherwise = False

FilePartition.withFilePartition path name (const $ return ())
`shouldThrow` isInconsistent

it "truncates records file to last index entry when records is larger" $
withTempPath $ \path -> do
let name = "file-partition"
records = path </> name <> ".records"
(initial, more) = splitAt 5 (take 10 messages)

FilePartition.withFilePartition path name $ \partition ->
traverse_ (P.write partition) initial

originalSize <- getFileSize records

-- Simulate an interrupted write: extra bytes in the records file
-- with no corresponding index entry.
appendFile records "orphan bytes\n"

FilePartition.withFilePartition path name $ \partition -> do
afterSize <- getFileSize records
afterSize `shouldBe` originalSize

traverse_ (P.write partition) more

P.withReader partition Beginning $ \reader -> do
rs <- replicateM (length initial + length more) (snd <$> P.read reader)
rs `shouldBe` initial ++ more

testPartition :: Partition a => (forall b. FilePath -> (a -> IO b) -> IO b) -> Spec
testPartition with = do
it "reads what is written" $
Expand Down
Loading