From 9ca60c33af7637d4e336faf17c432708b702dda5 Mon Sep 17 00:00:00 2001 From: amackpro Date: Fri, 16 Sep 2022 19:14:40 +0530 Subject: [PATCH] Kdz: Switch to iscgar's KDZ extractor impl --- extractor.sh | 20 +- tools/unkdz.py | 716 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 724 insertions(+), 12 deletions(-) create mode 100644 tools/unkdz.py diff --git a/extractor.sh b/extractor.sh index 152acca..e55f85c 100755 --- a/extractor.sh +++ b/extractor.sh @@ -86,8 +86,7 @@ lpunpack="$toolsdir/$HOST/bin/lpunpack" splituapp="$toolsdir/splituapp" pacextractor="$toolsdir/pacExtractor.py" nb0_extract="$toolsdir/$HOST/bin/nb0-extract" -kdz_extract="$toolsdir/kdztools/unkdz.py" -dz_extract="$toolsdir/kdztools/undz.py" +kdz_extract="$toolsdir/unkdz.py" ruu="$toolsdir/$HOST/bin/RUU_Decrypt_Tool" aml_extract="$toolsdir/aml-upgrade-package-extract" @@ -121,15 +120,12 @@ fi if [[ $(echo "$romzip" | grep kdz) ]]; then echo "KDZ detected" - python3 $kdz_extract -f "$romzip" -x -o "./" - dzfile=`ls *.dz` - python3 $dz_extract -f $dzfile -s -o "./" - # Some known dz-partitions "gpt_main persist misc metadata vendor system system_other product userdata gpt_backup tz boot dtbo vbmeta cust oem odm factory modem NON-HLOS" - find . -maxdepth 4 -type f -name "*.image" | rename 's/.image/.img/g' > /dev/null 2>&1 - find . -maxdepth 4 -type f -name "*_a.img" | rename 's/_a.img/.img/g' > /dev/null 2>&1 - for partition in $PARTITIONS; do - [[ -e "$tmpdir/$partition.img" ]] && mv "$tmpdir/$partition.img" "$outdir/$partition.img" - done + python3 $kdz_extract -e "./" "$romzip" >> /dev/null + rm -rf *GPT* + for i in $(ls | cut -d "." -f 2,3) + do + mv *"${i}" "$outdir"/"${i}" + done rm -rf $tmpdir exit 0 fi @@ -497,4 +493,4 @@ for partition in $PARTITIONS; do done cd "$LOCALDIR" -rm -rf "$tmpdir" +rm -rf "$tmpdir" \ No newline at end of file diff --git a/tools/unkdz.py b/tools/unkdz.py new file mode 100644 index 0000000..8cbadf7 --- /dev/null +++ b/tools/unkdz.py @@ -0,0 +1,716 @@ +# A simple and correct LG KDZ Android image extractor, because I got fed up +# with the partially working one from kdztools. +# +# Copyright (c) 2021 Isaac Garzon +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import print_function +import io +import os +import errno +import argparse +import struct +import hashlib +import binascii +import datetime +import collections +import zlib +import zstandard + + +def decode_asciiz(s): + return s.rstrip(b'\x00').decode('ascii') + + +def iter_read(file, size, chunk_size): + while size > 0: + chunk = file.read(min(chunk_size, size)) + assert len(chunk) > 0 + yield chunk + size -= len(chunk) + + +class KdzHeader(object): + V1_HDR_SIZE = 1304 + V1_MAGIC = 0x50447932 + V2_HDR_SIZE = 1320 + V2_MAGIC = 0x80253134 + V3_HDR_SIZE = 1320 + V3_MAGIC = 0x25223824 + + BASE_HDR_FMT = struct.Struct(' 0, ( + 'unexpected empty part @ {} ({})'.format(i, name)) + # Disabled validation because of broken vendor_b partition: + # assert part.end_sect == 0 or ( + # part.end_sect >= part.start_sect + part.data_sect_cnt), ( + # 'Unexpected value for end sector {} == {} + {} @ {} ({})'.format( + # part.end_sect, part.start_sect, part.data_sect_cnt, i, name)) + assert part.reserved == 0, ( + 'unexpected reserved field value {} @ {} ({})'.format( + part.reserved, i, name)) + self.parts.setdefault( + part.hw_part, collections.OrderedDict()).setdefault( + name, []).append(part) + self.magic = magic + self.flags = flags + self.signature = signature[:sig_size] + + +class DzHeader(object): + MAGIC = 0x74189632 + PART_MAGIC = 0x78951230 + + READ_CHUNK_SIZE = 1048576 # 1MiB + + HW_PARTITION_NONE = 0x5000 + + HDR_FMT = struct.Struct( + ' 0, 'expected positive part count, got {}'.format( + part_count) + + assert unk0 == 0, 'expected 0 in unknown field, got {}'.format(unk0) + assert unk1 in (0, 0xffffffff), 'uexpected value in unknown field: {:x}'.format(unk1) + assert unk2 in (0, 1), 'expected 0 or 1 in unknown field, got {}'.format(unk2) + assert all(b == 0 for b in padding), 'non zero bytes in header padding' + + self.magic = magic + self.major = major + self.minor = minor + if all(w == 0 for w in ( + build_year, build_mon, build_weekday, build_day, + build_hour, build_min, build_sec, build_millisec)): + self.build_date = None + else: + self.build_date = datetime.datetime( + build_year, build_mon, build_day, build_hour, build_min, build_sec, + microsecond=build_millisec*1000) + assert self.build_date.weekday() == build_weekday, ( + 'invalid build weekday. Expected {}, got {}'.format( + self.build_date.weekday(), build_weekday)) + self.compression = self._parse_compression_type(compression) + self.secure_image_type = secure_image_type + self.swfv = decode_asciiz(swfv) + self.build_type = decode_asciiz(build_type) + self.android_ver = decode_asciiz(android_ver) + self.memory_size = decode_asciiz(memory_size) + self.signed_security = decode_asciiz(signed_security) + self.anti_rollback_ver = anti_rollback_ver + self.supported_mem = decode_asciiz(supported_mem) + self.target_product = decode_asciiz(target_product) + self.operator_code = decode_asciiz(operator_code).split('.') + self.multi_panel_mask = multi_panel_mask + self.product_fuse_id = product_fuse_id + self.is_factory_image = is_factory_image == b'F' + self.is_ufs = bool(is_ufs) + self.chunk_hdrs_hash = chunk_hdrs_hash + self.data_hash = data_hash + self.header_crc = header_crc + + if self.minor == 0: + self.parts = self._parse_v0_part_headers(part_count, file) + else: + self.parts = self._parse_v1_part_headers(part_count, file) + + if self._verify_data_hash: + calculated_data_hash = self._verify_data_hash.digest() + assert self.data_hash == calculated_data_hash, ( + 'data hash mismatch: expected {}, got {}'.format( + binascii.hexlify(header_hash), + binascii.hexlify(calculated_data_hash))) + + + def _parse_compression_type(self, compression): + if compression[1] != 0: + compression = decode_asciiz(compression).lower() + assert self.compression in ('zlib', 'zstd'), ( + 'unknown compression {}'.format(compression)) + else: + assert all(b == 0 for b in compression[1:]), ( + 'non zero bytes after compression type byte') + assert compression[0] in (1, 4), ( + 'unknown compression type {}'.format(compression[0])) + if compression[0] == 1: + compression = 'zlib' + elif compression[0] == 4: + compression = 'zstd' + return compression + + def _parse_v0_part_headers(self, part_count, file): + parts = collections.OrderedDict() + verify_hdr_hash = hashlib.md5() + for i in range(part_count): + chunk_hdr_data = file.read(self.V0_PART_FMT.size) + (magic, part_name, chunk_name, + data_size, file_size, part_hash) = self.V0_PART_FMT.unpack( + chunk_hdr_data) + verify_hdr_hash.update(chunk_hdr_data) + assert magic == self.PART_MAGIC, ( + 'invalid part magic {:x} @ index {}'.format(magic, i)) + assert all(b == 0 for b in padding), ( + 'non zero bytes in part padding @ index {}'.format(i)) + assert data_size > 0 and file_size > 0, ( + 'both data size ({}) and file size ({}) must be positive @ index {}'.format( + data_size, file_size, i)) + part_name = decode_asciiz(part_name) + chunk_name = decode_asciiz(chunk_name) + parts.setdefault( + hw_partition, collections.OrderedDict()).setdefault( + part_name, []).append(self.Chunk( + chunk_name, data_size, file.tell(), file_size, + part_hash, 0, 0, 0, 0, 0, False, False)) + if self._verify_data_hash: + self._verify_data_hash.update(chunk_hdr_data) + for chunk_data in iter_read(file, file_size, self.READ_CHUNK_SIZE): + self._verify_data_hash.update(chunk_data) + else: + file.seek(file_size, 1) + assert verify_hdr_hash.digest() == self.chunk_hdrs_hash, ( + 'chunk headers hash mismatch: expected {}, got {}'.format( + binascii.hexlify(verify_hdr_hash.digest()), + binascii.hexlify(self.chunk_hdrs_hash))) + return parts + + def _parse_v1_part_headers(self, part_count, file): + parts = collections.OrderedDict() + verify_hdr_hash = hashlib.md5() + part_start_sector = 0 + part_sector_count = 0 + for i in range(part_count): + chunk_hdr_data = file.read(self.V1_PART_FMT.size) + (magic, part_name, chunk_name, + data_size, file_size, part_hash, + start_sector, sector_count, hw_partition, + part_crc, unique_part_id, is_sparse, is_ubi_image, + maybe_pstart_sector, padding) = self.V1_PART_FMT.unpack( + chunk_hdr_data) + verify_hdr_hash.update(chunk_hdr_data) + assert magic == self.PART_MAGIC, ( + 'invalid part magic {:x} @ index {}'.format(magic, i)) + assert all(b == 0 for b in padding), ( + 'non zero bytes in part padding @ index {}'.format(i)) + assert data_size > 0 and file_size > 0, ( + 'both data size ({}) and file size ({}) must be positive @ index {}'.format( + data_size, file_size, i)) + part_name = decode_asciiz(part_name) + + if hw_partition not in parts: + part_start_sector = 0 + part_sector_count = 0 + + if (maybe_pstart_sector > part_start_sector and + maybe_pstart_sector <= start_sector): + part_start_sector = maybe_pstart_sector + elif part_name not in parts[hw_partition]: + if maybe_pstart_sector == 0: + part_start_sector = start_sector + else: + part_start_sector += part_sector_count + + if (maybe_pstart_sector > part_start_sector and + maybe_pstart_sector <= start_sector): + part_start_sector = maybe_pstart_sector + + part_sector_count = 0 + + assert maybe_pstart_sector == 0 or maybe_pstart_sector == part_start_sector, ( + 'mismatch in part start sector @ index {} (expected {}, got {})'.format( + i, part_start_sector, maybe_pstart_sector)) + chunk_name = decode_asciiz(chunk_name) + parts.setdefault( + hw_partition, collections.OrderedDict()).setdefault( + part_name, []).append(self.Chunk( + chunk_name, data_size, file.tell(), file_size, + part_hash, part_crc, start_sector, sector_count, + part_start_sector, unique_part_id, + bool(is_sparse), bool(is_ubi_image))) + + part_sector_count = (start_sector - part_start_sector) + sector_count + if self._verify_data_hash: + self._verify_data_hash.update(chunk_hdr_data) + for chunk_data in iter_read(file, file_size, self.READ_CHUNK_SIZE): + self._verify_data_hash.update(chunk_data) + else: + file.seek(file_size, 1) + assert verify_hdr_hash.digest() == self.chunk_hdrs_hash, ( + 'chunk headers hash mismatch: expected {}, got {}'.format( + binascii.hexlify(verify_hdr_hash.digest()), + binascii.hexlify(self.chunk_hdrs_hash))) + return parts + +def parse_kdz_header(f): + def read_asciiz_data(offset, size): + f.seek(offset) + return decode_asciiz(f.read(size)) + + hdr = KdzHeader(f) + + print('KDZ Header') + print('==========') + print('version = {}, magic = {:x}, size = {}'.format(hdr.version, hdr.magic, hdr.size)) + print('records = {}'.format(len(hdr.records))) + for record in hdr.records: + print(' {}'.format(record)) + print('tag = {}'.format(hdr.tag)) + print('extended_mem_id = {}'.format(hdr.extended_mem_id)) + print(' data = {}'.format(read_asciiz_data(hdr.extended_mem_id.offset, hdr.extended_mem_id.size))) + print('additional_records_size = {}'.format(hdr.additional_records_size)) + print(' suffix_map = {}'.format(hdr.suffix_map)) + print(' data = {}'.format(read_asciiz_data(hdr.suffix_map.offset, hdr.suffix_map.size).split('\n'))) + print(' sku_map = {}'.format(hdr.sku_map)) + print(' data = {}'.format(read_asciiz_data(hdr.sku_map.offset, hdr.sku_map.size).split('\n'))) + print(' extended_sku_map = {}'.format(hdr.extended_sku_map)) + print(' data =') + if hdr.extended_sku_map.size > 0: + print(' {}'.format('\n '.join(read_asciiz_data( + hdr.extended_sku_map.offset, hdr.extended_sku_map.size).split('\n')))) + print('ftm_model_name = {}'.format(hdr.ftm_model_name)) + print('') + + return hdr + + +def parse_secure_partition(f): + try: + sec_part = SecurePartition(f) + except ValueError: + print('No secure partition found') + else: + print('Secure Partition') + print('================') + print('magic = {:x}'.format(sec_part.magic)) + print('flags = {:x}'.format(sec_part.flags)) + print('signature = {}'.format(binascii.hexlify(sec_part.signature))) + print('parts = {}'.format(sum( + len(chunks) for p in sec_part.parts.values() + for chunks in p.values()))) + print('') + + +def parse_dz_record(f, dz_record): + f.seek(dz_record.offset) + dz_hdr = DzHeader(f) + + print('DZ header') + print('=========') + print('magic = {:x}'.format(dz_hdr.magic)) + print('major = {}'.format(dz_hdr.major)) + print('minor = {}'.format(dz_hdr.minor)) + print('build date = {}'.format(dz_hdr.build_date)) + print('compression = {}'.format(dz_hdr.compression)) + print('secure_image_type = {}'.format(dz_hdr.secure_image_type)) + print('swfv = {}'.format(dz_hdr.swfv)) + print('build_type = {}'.format(dz_hdr.build_type)) + print('android_ver = {}'.format(dz_hdr.android_ver)) + print('memory_size = {}'.format(dz_hdr.memory_size)) + print('signed_security = {}'.format(dz_hdr.signed_security)) + print('anti_rollback_ver = {:x}'.format(dz_hdr.anti_rollback_ver)) + print('supported_mem = {}'.format(dz_hdr.supported_mem)) + print('target_product = {}'.format(dz_hdr.target_product)) + print('operator_code = {}'.format(dz_hdr.operator_code)) + print('multi_panel_mask = {}'.format(dz_hdr.multi_panel_mask)) + print('product_fuse_id = {}'.format(dz_hdr.product_fuse_id)) + print('is_factory_image = {}'.format(dz_hdr.is_factory_image)) + print('is_ufs = {}'.format(dz_hdr.is_ufs)) + print('chunk_hdrs_hash = {}'.format( + binascii.hexlify(dz_hdr.chunk_hdrs_hash))) + print('data_hash = {}'.format(binascii.hexlify(dz_hdr.data_hash))) + print('header_crc = {:x}'.format(dz_hdr.header_crc)) + print('parts = {}'.format(sum( + len(chunks) for p in dz_hdr.parts.values() for chunks in p.values()))) + print('') + + return dz_hdr + + +def extract_dz_parts(f, dz_hdr, out_path): + if dz_hdr.compression == 'zlib': + def decompressor(): + return zlib.decompressobj() + elif dz_hdr.compression == 'zstd': + def decompressor(): + # This unfortnately doesn't do any good because the zstd library + # doesn't implement streaming decompression, so it'll load the + # entire stream into memory *sigh* + obj = zstandard.ZstdDecompressor() + return obj.decompressobj() + + WRITE_FILL = b'\x00' * (4096 * 100) + + for hw_part, parts in dz_hdr.parts.items(): + print('Partition {}:'.format(hw_part)) + for pname, chunks in parts.items(): + out_file_name = os.path.join(out_path, '{}.{}.img'.format(hw_part, pname)) + print(' extracting part {}...'.format(pname)) + with open(out_file_name, 'wb') as out_f: + start_offset = chunks[0].part_start_sector * 4096 + for i, chunk in enumerate(chunks): + print(' extracting chunk {} ({} bytes)...'.format( + chunk.name, max( + chunk.data_size, chunk.sector_count * 4096))) + expected_offset = chunk.start_sector * 4096 + while start_offset < expected_offset: + write_len = min( + expected_offset - start_offset, len(WRITE_FILL)) + out_f.write(WRITE_FILL[:write_len]) + start_offset += write_len + f.seek(chunk.file_offset) + decomp = decompressor() + for chunk_data in iter_read(f, chunk.file_size, 1024*1024): + chunk_data = decomp.decompress(chunk_data) + out_f.write(chunk_data) + start_offset += len(chunk_data) + chunk_data = decomp.flush() + out_f.write(chunk_data) + start_offset += len(chunk_data) + # sec_chunk = sec_part.partitions[hw_part][ + # pname if pname != 'OP_S' else 'OP_a'][i] + expected_offset = (chunks[-1].start_sector + chunks[-1].sector_count) * 4096 + while start_offset < expected_offset: + write_len = min( + expected_offset - start_offset, len(WRITE_FILL)) + out_f.write(WRITE_FILL[:write_len]) + start_offset += write_len + print(' done. extracted size = {} bytes'.format( + start_offset - (chunks[0].start_sector * 4096))) + print('') + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('file', type=argparse.FileType('rb')) + parser.add_argument('-e', '--extract-to') + args = parser.parse_args() + + with args.file as in_file: + kdz_header = parse_kdz_header(in_file) + parse_secure_partition(in_file) + try: + dz_record = next( + record for record in kdz_header.records + if record.name.endswith('.dz')) + dz_hdr = parse_dz_record(in_file, dz_record) + except StopIteration: + raise SystemExit('No DZ record in KDZ file') + + if args.extract_to: + try: + os.makedirs(args.extract_to) + except (OSError, IOError) as e: + if e.errno != errno.EEXIST: + raise + + extract_dz_parts(in_file, dz_hdr, args.extract_to) + else: + for hw_part, parts in dz_hdr.parts.items(): + print('Partition {}:'.format(hw_part)) + for pname, chunks in parts.items(): + print(' {}'.format(pname)) + for i, chunk in enumerate(chunks): + print(' {}. {} ({} bytes, sparse: {})'.format( + i, chunk.name, max( + chunk.data_size, chunk.sector_count * 4096), + chunk.is_sparse)) + print('') + + +if __name__ == '__main__': + main() \ No newline at end of file