diff --git a/src/Ambar/Emulator/Queue/Partition/File.hs b/src/Ambar/Emulator/Queue/Partition/File.hs index 5250df8..39dc0b0 100644 --- a/src/Ambar/Emulator/Queue/Partition/File.hs +++ b/src/Ambar/Emulator/Queue/Partition/File.hs @@ -43,6 +43,7 @@ import qualified GHC.IO.Device as FD import System.IO ( Handle , hSeek + , hSetFileSize , withFile , openFile , hClose @@ -115,6 +116,7 @@ data OpenError = AlreadyOpen FilePath | MissingRecords FilePath | MissingIndex FilePath + | IndexAheadOfRecords FilePath deriving Show instance Exception OpenError where @@ -122,6 +124,7 @@ instance Exception OpenError where AlreadyOpen path -> "Lock found. Partition already open: " <> path MissingRecords path -> "Missing records file: " <> path MissingIndex path -> "Missing index file: " <> path + IndexAheadOfRecords path -> "Records file is smaller than its index claims: " <> path open :: HasCallStack => FilePath -> String -> IO FilePartition open location name = do @@ -132,7 +135,7 @@ open location name = do (count, size) <- if not exists then createIndex index - else loadIndex index + else loadIndex records index writeFile lock "locked" fd_records <- openNonLockingWritableFD records @@ -165,16 +168,22 @@ open location name = do LB.writeFile index $ Binary.encode @Word64 0 return (Count 0, Bytes 0) - loadIndex index = do + loadIndex records index = do indexSize <- fromIntegral <$> getFileSize index let count = (indexSize `div` _WORD64_SIZE) - 1 lastIndexEntry <- readIndexEntry index (Offset count) - -- This should be the same as the size of the records file. - -- However we use the last index entry in case the program - -- was interrupted between writing to the records file and - -- to the index file. - let byteOffsetOfNextEntry = lastIndexEntry - return (Count count, byteOffsetOfNextEntry) + recordsSize <- getFileSize records + let Bytes lastEntry = lastIndexEntry + when (recordsSize < fromIntegral lastEntry) $ + throwIO $ IndexAheadOfRecords records + -- If the records file is larger than the last index entry, a record + -- was appended but the program was interrupted before its index entry + -- was written. Discard the orphan bytes so the records file end matches + -- the index, keeping subsequent appends consistent. + when (recordsSize > fromIntegral lastEntry) $ + withFile records ReadWriteMode $ \h -> + hSetFileSize h (fromIntegral lastEntry) + return (Count count, lastIndexEntry) close :: HasCallStack => FilePartition -> IO () close FilePartition{..} = diff --git a/tests/Test/Queue.hs b/tests/Test/Queue.hs index e8f042b..e8bf64a 100644 --- a/tests/Test/Queue.hs +++ b/tests/Test/Queue.hs @@ -15,6 +15,9 @@ import qualified Data.Text as Text import Data.Foldable (traverse_) import qualified Data.Text.Encoding as Text import Data.Typeable (Typeable) +import System.Directory (getFileSize) +import System.FilePath (()) +import System.IO (IOMode(..), hSetFileSize, withFile) import System.IO.Temp (withSystemTempDirectory) import Test.Hspec ( Spec @@ -55,6 +58,8 @@ testQueues = do testQueue describe "partition" $ do testPartition withFilePartition + describe "file partition" $ do + testFilePartition describe "topic" $ do testTopic withFileTopic where @@ -340,6 +345,52 @@ instance Partition FailPartition where read _ = throwIO FailPartition write _ _ = return () +testFilePartition :: Spec +testFilePartition = do + it "fails to open when records file is smaller than last index entry" $ + withTempPath $ \path -> do + let name = "file-partition" + records = path name <> ".records" + + FilePartition.withFilePartition path name $ \partition -> + traverse_ (P.write partition) (take 5 messages) + + size <- getFileSize records + withFile records ReadWriteMode $ \h -> + hSetFileSize h (size - 1) + + let isInconsistent e + | Just (FilePartition.IndexAheadOfRecords _) <- fromException e = True + | otherwise = False + + FilePartition.withFilePartition path name (const $ return ()) + `shouldThrow` isInconsistent + + it "truncates records file to last index entry when records is larger" $ + withTempPath $ \path -> do + let name = "file-partition" + records = path name <> ".records" + (initial, more) = splitAt 5 (take 10 messages) + + FilePartition.withFilePartition path name $ \partition -> + traverse_ (P.write partition) initial + + originalSize <- getFileSize records + + -- Simulate an interrupted write: extra bytes in the records file + -- with no corresponding index entry. + appendFile records "orphan bytes\n" + + FilePartition.withFilePartition path name $ \partition -> do + afterSize <- getFileSize records + afterSize `shouldBe` originalSize + + traverse_ (P.write partition) more + + P.withReader partition Beginning $ \reader -> do + rs <- replicateM (length initial + length more) (snd <$> P.read reader) + rs `shouldBe` initial ++ more + testPartition :: Partition a => (forall b. FilePath -> (a -> IO b) -> IO b) -> Spec testPartition with = do it "reads what is written" $