diff --git a/aff4.py b/aff4.py index 7725ca1..aed8f25 100644 --- a/aff4.py +++ b/aff4.py @@ -181,6 +181,14 @@ def verify(file, password): printVolumeInfo(file, childVolume) printCaseInfo(childVolume) resolver = childVolume.resolver + + metadata_verified, metadata_hashes = resolver.verify_container_metadata_integrity(volume.zip_file) + print("\tContainer Metadata:") + if not metadata_verified: + print("\t\tContainer Metadata Verification Failed") + for hash in metadata_hashes: + print(f"\t\t-{hash['hash_type'].upper()} - {'VERIFIED' if hash['verified'] else 'FAILED'} | Stored: {hash['stored_hash']} - Calculated {hash['calculated_hash']}") + hasher = linear_hasher.LinearHasher2(resolver, LinearVerificationListener()) for image in childVolume.images(): print("\t%s <%s>" % (image.name(), trimVolume(childVolume.urn, image.urn))) @@ -190,6 +198,13 @@ def verify(file, password): printCaseInfo(volume) resolver = volume.resolver + metadata_verified, metadata_hashes = resolver.verify_container_metadata_integrity(volume.zip_file) + print("\tContainer Metadata:") + if not metadata_verified: + print("\t\tContainer Metadata Verification Failed") + for hash in metadata_hashes: + print(f"\t\t- {hash['hash_type'].upper()} - {'VERIFIED' if hash['verified'] else 'FAILED'} | Stored: {hash['stored_hash']} - Calculated {hash['calculated_hash']}") + if type(volume) == container.PhysicalImageContainer: image = volume.image listener = VerificationListener() diff --git a/pyaff4/data_store.py b/pyaff4/data_store.py index 6964872..baef8e9 100644 --- a/pyaff4/data_store.py +++ b/pyaff4/data_store.py @@ -31,6 +31,8 @@ import sys import types import binascii +import hashlib +import json from rdflib import URIRef from itertools import chain @@ -490,6 +492,7 @@ def DumpToTurtle(self, zipcontainer, ): break turtle_segment.Flush() turtle_segment.Close() + self.write_metadata_hashes(zipcontainer) def _DumpToTurtle(self, volumeurn, verbose=False): g = rdflib.Graph() @@ -547,6 +550,55 @@ def loadMetadata(self, zip): self.LoadFromTurtle(fd, zip.urn) self.loadedVolumes.append(zip.urn) + def write_metadata_hashes(self, zipcontainer): + with zipcontainer.OpenZipSegment("information.turtle") as fd: + data = fd.read() + hashes = { + "md5": hashlib.md5(data).hexdigest(), + "sha1": hashlib.sha1(data).hexdigest(), + "sha256": hashlib.sha256(data).hexdigest() + } + with zipcontainer.CreateZipSegment(u"container.hashes") as container_hashes_segment: + container_hashes_segment.compression_method = ZIP_DEFLATE + container_hashes_segment.write(utils.SmartStr(json.dumps(hashes))) + container_hashes_segment.Flush() + container_hashes_segment.Close() + + def read_metadata_hashes(self, zipcontainer): + # containerHashesURN = escaping.urn_from_member_name(u"container.hashes", zipcontainer.urn, zipcontainer.version) + if not zipcontainer.ContainsMember("container.hashes"): + with zipcontainer.OpenZipSegment("container.hashes") as fd: + data = fd.read() + hashes = json.loads(data) + return hashes + else: + # No container.hashes found, return empty hashlist. + return {} + + def verify_container_metadata_integrity(self, zipcontainer): + stored_hashes = self.read_metadata_hashes(zipcontainer) + hashes = [] + failed = False + with zipcontainer.OpenZipSegment("information.turtle") as fd: + data = fd.read() + for hash_type, stored_hash in stored_hashes.items(): + calculated_hash = "" + if hash_type == "md5": + calculated_hash = hashlib.md5(data).hexdigest() + elif hash_type == "sha1": + calculated_hash = hashlib.sha1(data).hexdigest() + elif hash_type == "sha256": + calculated_hash = hashlib.sha256(data).hexdigest() + verified = stored_hash == calculated_hash + hashes.append({ + 'hash_type': hash_type, + 'stored_hash': stored_hash, + 'calculated_hash': calculated_hash, + 'verified': verified + }) + failed = failed if failed else verified == False + return not failed, hashes + def LoadFromTurtle(self, stream, volume_arn): data = streams.ReadAll(stream) g = rdflib.Graph()