From d470760e66216c96601bda7eb6401c81c92a196e Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Thu, 23 Apr 2026 13:55:16 +0100 Subject: [PATCH 1/3] Throw error if index is ahead of partition file --- src/Ambar/Emulator/Queue/Partition/File.hs | 10 +++++++-- tests/Test/Queue.hs | 26 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/Ambar/Emulator/Queue/Partition/File.hs b/src/Ambar/Emulator/Queue/Partition/File.hs index 5250df8..68fcfbd 100644 --- a/src/Ambar/Emulator/Queue/Partition/File.hs +++ b/src/Ambar/Emulator/Queue/Partition/File.hs @@ -115,6 +115,7 @@ data OpenError = AlreadyOpen FilePath | MissingRecords FilePath | MissingIndex FilePath + | IndexAheadOfRecords FilePath deriving Show instance Exception OpenError where @@ -122,6 +123,7 @@ instance Exception OpenError where AlreadyOpen path -> "Lock found. Partition already open: " <> path MissingRecords path -> "Missing records file: " <> path MissingIndex path -> "Missing index file: " <> path + IndexAheadOfRecords path -> "Records file is smaller than its index claims: " <> path open :: HasCallStack => FilePath -> String -> IO FilePartition open location name = do @@ -132,7 +134,7 @@ open location name = do (count, size) <- if not exists then createIndex index - else loadIndex index + else loadIndex records index writeFile lock "locked" fd_records <- openNonLockingWritableFD records @@ -165,10 +167,14 @@ open location name = do LB.writeFile index $ Binary.encode @Word64 0 return (Count 0, Bytes 0) - loadIndex index = do + loadIndex records index = do indexSize <- fromIntegral <$> getFileSize index let count = (indexSize `div` _WORD64_SIZE) - 1 lastIndexEntry <- readIndexEntry index (Offset count) + recordsSize <- getFileSize records + let Bytes lastEntry = lastIndexEntry + when (recordsSize < fromIntegral lastEntry) $ + throwIO $ IndexAheadOfRecords records -- This should be the same as the size of the records file. -- However we use the last index entry in case the program -- was interrupted between writing to the records file and diff --git a/tests/Test/Queue.hs b/tests/Test/Queue.hs index e8f042b..cff479a 100644 --- a/tests/Test/Queue.hs +++ b/tests/Test/Queue.hs @@ -15,6 +15,9 @@ import qualified Data.Text as Text import Data.Foldable (traverse_) import qualified Data.Text.Encoding as Text import Data.Typeable (Typeable) +import System.Directory (getFileSize) +import System.FilePath (()) +import System.IO (IOMode(..), hSetFileSize, withFile) import System.IO.Temp (withSystemTempDirectory) import Test.Hspec ( Spec @@ -55,6 +58,8 @@ testQueues = do testQueue describe "partition" $ do testPartition withFilePartition + describe "file partition" $ do + testFilePartition describe "topic" $ do testTopic withFileTopic where @@ -340,6 +345,27 @@ instance Partition FailPartition where read _ = throwIO FailPartition write _ _ = return () +testFilePartition :: Spec +testFilePartition = do + it "fails to open when records file is smaller than last index entry" $ + withTempPath $ \path -> do + let name = "file-partition" + records = path name <> ".records" + + FilePartition.withFilePartition path name $ \partition -> + traverse_ (P.write partition) (take 5 messages) + + size <- getFileSize records + withFile records ReadWriteMode $ \h -> + hSetFileSize h (size - 1) + + let isInconsistent e + | Just (FilePartition.IndexAheadOfRecords _) <- fromException e = True + | otherwise = False + + FilePartition.withFilePartition path name (const $ return ()) + `shouldThrow` isInconsistent + testPartition :: Partition a => (forall b. FilePath -> (a -> IO b) -> IO b) -> Spec testPartition with = do it "reads what is written" $ From 68a70f064c50e70f61fc5c157ff144f3dcdd4d24 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Thu, 23 Apr 2026 14:00:18 +0100 Subject: [PATCH 2/3] Trim partition if a restart prevents it from being appended to --- src/Ambar/Emulator/Queue/Partition/File.hs | 15 +++++++++------ tests/Test/Queue.hs | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/Ambar/Emulator/Queue/Partition/File.hs b/src/Ambar/Emulator/Queue/Partition/File.hs index 68fcfbd..39dc0b0 100644 --- a/src/Ambar/Emulator/Queue/Partition/File.hs +++ b/src/Ambar/Emulator/Queue/Partition/File.hs @@ -43,6 +43,7 @@ import qualified GHC.IO.Device as FD import System.IO ( Handle , hSeek + , hSetFileSize , withFile , openFile , hClose @@ -175,12 +176,14 @@ open location name = do let Bytes lastEntry = lastIndexEntry when (recordsSize < fromIntegral lastEntry) $ throwIO $ IndexAheadOfRecords records - -- This should be the same as the size of the records file. - -- However we use the last index entry in case the program - -- was interrupted between writing to the records file and - -- to the index file. - let byteOffsetOfNextEntry = lastIndexEntry - return (Count count, byteOffsetOfNextEntry) + -- If the records file is larger than the last index entry, a record + -- was appended but the program was interrupted before its index entry + -- was written. Discard the orphan bytes so the records file end matches + -- the index, keeping subsequent appends consistent. + when (recordsSize > fromIntegral lastEntry) $ + withFile records ReadWriteMode $ \h -> + hSetFileSize h (fromIntegral lastEntry) + return (Count count, lastIndexEntry) close :: HasCallStack => FilePartition -> IO () close FilePartition{..} = diff --git a/tests/Test/Queue.hs b/tests/Test/Queue.hs index cff479a..36870bb 100644 --- a/tests/Test/Queue.hs +++ b/tests/Test/Queue.hs @@ -366,6 +366,25 @@ testFilePartition = do FilePartition.withFilePartition path name (const $ return ()) `shouldThrow` isInconsistent + it "truncates records file to last index entry when records is larger" $ + withTempPath $ \path -> do + let name = "file-partition" + records = path name <> ".records" + + FilePartition.withFilePartition path name $ \partition -> + traverse_ (P.write partition) (take 5 messages) + + originalSize <- getFileSize records + + -- Simulate an interrupted write: extra bytes in the records file + -- with no corresponding index entry. + appendFile records "orphan bytes\n" + + FilePartition.withFilePartition path name (const $ return ()) + + afterSize <- getFileSize records + afterSize `shouldBe` originalSize + testPartition :: Partition a => (forall b. FilePath -> (a -> IO b) -> IO b) -> Spec testPartition with = do it "reads what is written" $ From 6dfc474f7eb4fa2e216c95e5425b0356e2c73df1 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Thu, 23 Apr 2026 14:04:04 +0100 Subject: [PATCH 3/3] Test that partition continues working after orphaned bytes are trimmed --- tests/Test/Queue.hs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/Test/Queue.hs b/tests/Test/Queue.hs index 36870bb..e8bf64a 100644 --- a/tests/Test/Queue.hs +++ b/tests/Test/Queue.hs @@ -370,9 +370,10 @@ testFilePartition = do withTempPath $ \path -> do let name = "file-partition" records = path name <> ".records" + (initial, more) = splitAt 5 (take 10 messages) FilePartition.withFilePartition path name $ \partition -> - traverse_ (P.write partition) (take 5 messages) + traverse_ (P.write partition) initial originalSize <- getFileSize records @@ -380,10 +381,15 @@ testFilePartition = do -- with no corresponding index entry. appendFile records "orphan bytes\n" - FilePartition.withFilePartition path name (const $ return ()) + FilePartition.withFilePartition path name $ \partition -> do + afterSize <- getFileSize records + afterSize `shouldBe` originalSize - afterSize <- getFileSize records - afterSize `shouldBe` originalSize + traverse_ (P.write partition) more + + P.withReader partition Beginning $ \reader -> do + rs <- replicateM (length initial + length more) (snd <$> P.read reader) + rs `shouldBe` initial ++ more testPartition :: Partition a => (forall b. FilePath -> (a -> IO b) -> IO b) -> Spec testPartition with = do