Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions aff4.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ def verify(file, password):
printVolumeInfo(file, childVolume)
printCaseInfo(childVolume)
resolver = childVolume.resolver

metadata_verified, metadata_hashes = resolver.verify_container_metadata_integrity(volume.zip_file)
print("\tContainer Metadata:")
if not metadata_verified:
print("\t\tContainer Metadata Verification Failed")
for hash in metadata_hashes:
print(f"\t\t-{hash['hash_type'].upper()} - {'VERIFIED' if hash['verified'] else 'FAILED'} | Stored: {hash['stored_hash']} - Calculated {hash['calculated_hash']}")

hasher = linear_hasher.LinearHasher2(resolver, LinearVerificationListener())
for image in childVolume.images():
print("\t%s <%s>" % (image.name(), trimVolume(childVolume.urn, image.urn)))
Expand All @@ -190,6 +198,13 @@ def verify(file, password):
printCaseInfo(volume)
resolver = volume.resolver

metadata_verified, metadata_hashes = resolver.verify_container_metadata_integrity(volume.zip_file)
print("\tContainer Metadata:")
if not metadata_verified:
print("\t\tContainer Metadata Verification Failed")
for hash in metadata_hashes:
print(f"\t\t- {hash['hash_type'].upper()} - {'VERIFIED' if hash['verified'] else 'FAILED'} | Stored: {hash['stored_hash']} - Calculated {hash['calculated_hash']}")

if type(volume) == container.PhysicalImageContainer:
image = volume.image
listener = VerificationListener()
Expand Down
52 changes: 52 additions & 0 deletions pyaff4/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import sys
import types
import binascii
import hashlib
import json

from rdflib import URIRef
from itertools import chain
Expand Down Expand Up @@ -490,6 +492,7 @@ def DumpToTurtle(self, zipcontainer, ):
break
turtle_segment.Flush()
turtle_segment.Close()
self.write_metadata_hashes(zipcontainer)

def _DumpToTurtle(self, volumeurn, verbose=False):
g = rdflib.Graph()
Expand Down Expand Up @@ -547,6 +550,55 @@ def loadMetadata(self, zip):
self.LoadFromTurtle(fd, zip.urn)
self.loadedVolumes.append(zip.urn)

def write_metadata_hashes(self, zipcontainer):
with zipcontainer.OpenZipSegment("information.turtle") as fd:
data = fd.read()
hashes = {
"md5": hashlib.md5(data).hexdigest(),
"sha1": hashlib.sha1(data).hexdigest(),
"sha256": hashlib.sha256(data).hexdigest()
}
with zipcontainer.CreateZipSegment(u"container.hashes") as container_hashes_segment:
container_hashes_segment.compression_method = ZIP_DEFLATE
container_hashes_segment.write(utils.SmartStr(json.dumps(hashes)))
container_hashes_segment.Flush()
container_hashes_segment.Close()

def read_metadata_hashes(self, zipcontainer):
# containerHashesURN = escaping.urn_from_member_name(u"container.hashes", zipcontainer.urn, zipcontainer.version)
if not zipcontainer.ContainsMember("container.hashes"):
with zipcontainer.OpenZipSegment("container.hashes") as fd:
data = fd.read()
hashes = json.loads(data)
return hashes
else:
# No container.hashes found, return empty hashlist.
return {}

def verify_container_metadata_integrity(self, zipcontainer):
stored_hashes = self.read_metadata_hashes(zipcontainer)
hashes = []
failed = False
with zipcontainer.OpenZipSegment("information.turtle") as fd:
data = fd.read()
for hash_type, stored_hash in stored_hashes.items():
calculated_hash = ""
if hash_type == "md5":
calculated_hash = hashlib.md5(data).hexdigest()
elif hash_type == "sha1":
calculated_hash = hashlib.sha1(data).hexdigest()
elif hash_type == "sha256":
calculated_hash = hashlib.sha256(data).hexdigest()
verified = stored_hash == calculated_hash
hashes.append({
'hash_type': hash_type,
'stored_hash': stored_hash,
'calculated_hash': calculated_hash,
'verified': verified
})
failed = failed if failed else verified == False
return not failed, hashes

def LoadFromTurtle(self, stream, volume_arn):
data = streams.ReadAll(stream)
g = rdflib.Graph()
Expand Down