diff --git a/src/aleph/services/storage/fileystem_engine.py b/src/aleph/services/storage/fileystem_engine.py index 4a209f4c..8f9c3063 100644 --- a/src/aleph/services/storage/fileystem_engine.py +++ b/src/aleph/services/storage/fileystem_engine.py @@ -23,7 +23,20 @@ async def read(self, filename: str) -> Optional[bytes]: async def write(self, filename: str, content: bytes): file_path = self.folder / filename - file_path.write_bytes(content) + temp_path = self.folder / f"{filename}.tmp" + + try: + # Write to temporary file first + temp_path.write_bytes(content) + + # Atomic rename - this operation is atomic on POSIX systems + # If crash happens before this, temp file exists but target doesn't + temp_path.replace(file_path) + + except Exception: + # Clean up temp file if write failed + temp_path.unlink(missing_ok=True) + raise async def delete(self, filename: str): file_path = self.folder / filename diff --git a/src/aleph/storage.py b/src/aleph/storage.py index d9a4d19f..dd7707ad 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -84,14 +84,39 @@ async def get_message_content( try: content = aleph_json.loads(item_content) - except aleph_json.DecodeError as e: + except (aleph_json.DecodeError, json.decoder.JSONDecodeError) as e: error_msg = f"Can't decode JSON: {e}" LOGGER.warning(error_msg) - raise InvalidContent(error_msg) - except json.decoder.JSONDecodeError as e: - error_msg = f"Can't decode JSON: {e}" - LOGGER.warning(error_msg) - raise InvalidContent(error_msg) + # If content was from local cache and is corrupted, delete it and retry + if source == ContentSource.DB and item_type in ( + ItemType.ipfs, + ItemType.storage, + ): + LOGGER.warning( + f"Corrupted cached content for {item_hash}, deleting and retrying from network" + ) + await self.storage_engine.delete(filename=item_hash) + # Retry fetching from network/IPFS + hash_content = await self.get_hash_content( + item_hash, + engine=ItemType(item_type), + use_network=True, + use_ipfs=True, + ) + item_content = hash_content.value + source = hash_content.source + # Try parsing again + try: + content = aleph_json.loads(item_content) + except ( + aleph_json.DecodeError, + json.decoder.JSONDecodeError, + ) as retry_error: + raise InvalidContent( + f"Content still invalid after retry: {retry_error}" + ) from retry_error + else: + raise InvalidContent(error_msg) return MessageContent( hash=item_hash,