diff --git a/README.md b/README.md index 1e5a5d4..d4123ed 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ -# bitloss \ No newline at end of file +# bitloss + +These are prototype projects. + +I tried to do this using AI. Because I couldn't get the AI ​​to accept the algorithms, the examples in the branches are malfunctioning. Main is fine. diff --git a/bitloss.py b/bitloss.py index 488aca4..7f02fe7 100644 --- a/bitloss.py +++ b/bitloss.py @@ -2,34 +2,110 @@ # # MIT LICENSE # MESUT ERTURHAN -# https://github.com/piyxu/bitloss +# https://github.com/piyxu/bitlos +# -------------------------------------------------------------- -import random +""" +Bitloss encoder/decoder built on top of encode.py/convert.py helpers. + +Encoding flow per 256-bit block: + 1. Map raw block to R in [0, 16*C(256,128)) and run encode256 (metadata + balanced bits). + 2. Drop the last balanced bit, rank the remaining 255 bits with fixed k=128 (convert.py logic). + 3. Store 4-bit metadata + 251-bit rank for a 255-bit payload. + +All encoded payload bits are stored as ASCII bit strings in text files. +Only the final stage0 decode writes binary data. + +Header optimization: Only final stage has minimal header (1 bit block_flag). +Intermediate stages are headerless for maximum efficiency. + +Data integrity: BLAKE3 hash verification on encode/decode. +""" + +from __future__ import annotations + +import argparse +import sys +import base64 from math import comb +from pathlib import Path +from typing import Iterable, List, Tuple + +try: + import blake3 +except ImportError: + print("Error: blake3 module not found. Install with: pip install blake3", file=sys.stderr) + sys.exit(1) + + +C256 = comb(256, 128) +MASK_256 = (1 << 256) - 1 +MAX_R = 16 * C256 + + +def unrank_nk(n: int, k: int, r: int) -> List[int]: + """Lexicographic combinational unrank with fixed k.""" + bits = [] + rem = k + for i in range(n): + z = comb(n - i - 1, rem) if rem <= (n - i - 1) else 0 + if r < z: + bits.append(0) + else: + bits.append(1) + r -= z + rem -= 1 + return bits + + +def rank_nk(bits: Iterable[int], k_fixed: int) -> int: + """Lexicographic rank used by encode.py.""" + bits = list(bits) + n = len(bits) + rem = k_fixed + r = 0 + for i, b in enumerate(bits): + if b == 1: + z = comb(n - i - 1, rem) + r += z + rem -= 1 + return r + + +def unrank_256_128(r: int) -> List[int]: + return unrank_nk(256, 128, r) + + +def rank_256_128(bits: Iterable[int]) -> int: + return rank_nk(bits, 128) + + +def encode256(r_value: int) -> Tuple[List[int], int]: + if not (0 <= r_value < MAX_R): + raise ValueError("R must be in [0, 16*C(256,128))") + meta = r_value // C256 + idx = r_value % C256 + bits256 = unrank_256_128(idx) + return bits256, meta -def random_256_k128(): - """Generate a random 256-bit sequence with exactly 128 ones.""" - positions = random.sample(range(256), 128) - bits = ["0"] * 256 - for p in positions: - bits[p] = "1" - return "".join(bits) +def decode256(bits256: Iterable[int], meta: int) -> int: + bits256 = list(bits256) + idx = rank_256_128(bits256) + return meta * C256 + idx -def true_rank_fixed_k128(bits255): - """ - Compute the true combinational rank (C(n,k) based) - using FIXED k = 128 even if k255 becomes 127. - """ +def true_rank_fixed_k128(bits255: Iterable[int]) -> int: + """convert.py rank implementation (fixed k=128 on 255 bits).""" + bits = list(bits255) n = 255 k = 128 r = 0 rem = k - for i, b in enumerate(bits255): - if b == "1": + for i, b in enumerate(bits): + if b == 1: r += comb(n - i - 1, rem - 1) rem -= 1 if rem == 0: @@ -37,64 +113,489 @@ def true_rank_fixed_k128(bits255): return r -def pad_rank_to_251(rank_int): - """Convert rank to binary and left-pad with zeros to make exactly 251 bits.""" - b = bin(rank_int)[2:] # remove '0b' - if len(b) < 251: - b = '0' * (251 - len(b)) + b - return b +def unrank_true_fixed_k128(rank_value: int) -> List[int]: + """Inverse of true_rank_fixed_k128 (fixed k=128 over 255 bits).""" + n = 255 + rem = 128 + bits: List[int] = [] + + for i in range(n): + if rem == 0: + bits.append(0) + continue + z = comb(n - i - 1, rem - 1) + if rank_value < z: + bits.append(0) + else: + bits.append(1) + rank_value -= z + rem -= 1 + return bits -def decode_one(bits255, k255): - """ - Recover the last bit using only k255: - k255 == 128 → missing bit = '0' - k255 == 127 → missing bit = '1' - """ +def decode_missing_bit(bits255: List[int]) -> int: + """convert.py style recovery of the (lost) 256th bit.""" + k255 = sum(bits255) if k255 == 128: - missing = "0" - elif k255 == 127: - missing = "1" + return 0 + if k255 == 127: + return 1 + return 0 + + +def int_to_bits(value: int, width: int) -> List[int]: + return [(value >> (width - 1 - i)) & 1 for i in range(width)] + + +def bytes_to_bits(data: bytes) -> List[int]: + bits: List[int] = [] + for byte in data: + for shift in range(7, -1, -1): + bits.append((byte >> shift) & 1) + return bits + + +def bits_to_bytes(bits: List[int]) -> bytes: + if len(bits) % 8 != 0: + raise ValueError("Bit length must be a multiple of 8") + out = bytearray() + for i in range(0, len(bits), 8): + value = 0 + for bit in bits[i : i + 8]: + value = (value << 1) | bit + out.append(value) + return bytes(out) + + +def bits_to_bitstring(bits: List[int]) -> str: + """Convert list of bits to string representation.""" + return "".join(str(bit) for bit in bits) + + +def bitstring_to_bits(bitstring: str) -> List[int]: + """Convert string representation to list of bits.""" + return [int(c) for c in bitstring if c in '01'] + + +def bytes_to_bitstring(data: bytes) -> str: + return "".join("1" if bit else "0" for bit in bytes_to_bits(data)) + + +def compute_blake3_hash(data: bytes) -> str: + """Compute BLAKE3 hash and return as base64 string.""" + hasher = blake3.blake3(data) + return base64.b64encode(hasher.digest()).decode('ascii') + + +def verify_blake3_hash(data: bytes, expected_hash: str) -> bool: + """Verify BLAKE3 hash matches expected value.""" + actual_hash = compute_blake3_hash(data) + return actual_hash == expected_hash + + +class BitWriter: + def __init__(self) -> None: + self.bits: List[int] = [] + self.total_bits = 0 + + def write_bits(self, bits: Iterable[int]) -> None: + for bit in bits: + self.bits.append(bit & 1) + self.total_bits += 1 + + def get_bitstring(self) -> str: + """Return all bits as a string.""" + return bits_to_bitstring(self.bits) + + def get_bits(self) -> List[int]: + """Return all bits as a list.""" + return self.bits.copy() + + +class BitReader: + def __init__(self, bitstring: str) -> None: + self.bits = bitstring_to_bits(bitstring) + self.pos = 0 + self.useful_bits = len(self.bits) + + def _read_bit(self) -> int: + if self.pos >= self.useful_bits: + raise ValueError("Attempting to read past the end of bit stream") + bit = self.bits[self.pos] + self.pos += 1 + return bit + + def read_bits(self, count: int) -> List[int]: + return [self._read_bit() for _ in range(count)] + + def read_int(self, count: int) -> int: + value = 0 + for _ in range(count): + value = (value << 1) | self._read_bit() + return value + + def remaining(self) -> int: + return self.useful_bits - self.pos + + +def encode_block(block_value: int) -> Tuple[List[int], List[int]]: + if not (0 <= block_value < MAX_R): + raise ValueError("Block value out of range for encode256 parameters") + bits256, meta = encode256(block_value) + bits255 = bits256[:-1] + rank_value = true_rank_fixed_k128(bits255) + return int_to_bits(meta, 4), int_to_bits(rank_value, 251) + + +def encode_once(src: Path, dest: Path, stage: int = 1, total_stages: int = 1) -> None: + payload_writer = BitWriter() + logs: List[str] = [] + block_index = 0 + is_final_stage = (stage == total_stages) + + # Stage 1: Read binary and compute hash + # Stage 2+: Read text bitstring directly (no header in intermediate files) + blake3_hash = None + if stage == 1: + # Original file - read as binary + source_bytes = src.read_bytes() + original_byte_count = len(source_bytes) + # Compute BLAKE3 hash + blake3_hash = compute_blake3_hash(source_bytes) + logs.append(f"blake3_hash: {blake3_hash}") else: - missing = "0" - return bits255 + missing + # Intermediate .btl file - read as pure bitstring (no header/padding) + bitstring = src.read_text().strip() + source_bits = bitstring_to_bits(bitstring) + + # Convert to bytes for processing - pad to byte boundary if needed + original_bit_count = len(source_bits) + while len(source_bits) % 8 != 0: + source_bits.append(0) + source_bytes = bits_to_bytes(source_bits) + original_byte_count = (original_bit_count + 7) // 8 # Round up + + # Process in 32-byte chunks + full_blocks = len(source_bytes) // 32 + remainder_bytes = len(source_bytes) % 32 + + for block_idx in range(full_blocks): + chunk = source_bytes[block_idx * 32:(block_idx + 1) * 32] + + block_value = int.from_bytes(chunk, "big") + overflow = 1 if block_value >= MAX_R else 0 + if overflow: + block_value -= MAX_R + meta_bits, rank_bits = encode_block(block_value) + # No overflow bit - just meta + rank = 255 bits + payload_writer.write_bits(meta_bits) + payload_writer.write_bits(rank_bits) + meta_val = int("".join(str(b) for b in meta_bits), 2) + rank_val = int("".join(str(b) for b in rank_bits), 2) + logs.append( + f"encode block {block_index}: overflow={overflow} meta={meta_val} rank={rank_val} match=True" + ) + block_index += 1 + + # Handle remainder bytes - convert to bits and append as-is + if remainder_bytes > 0: + remainder = source_bytes[full_blocks * 32:] + payload_writer.write_bits(bytes_to_bits(remainder)) + logs.append(f"encode remainder_bytes={remainder_bytes} match=True") + + # Final output + final_writer = BitWriter() + + if is_final_stage: + # Final stage: Add header with original file size and BLAKE3 hash + # Header: 1 bit block_flag + 32 bits original_byte_count + 256 bits blake3_hash + block_flag = 1 if remainder_bytes == 0 else 0 + final_writer.write_bits([block_flag]) + final_writer.write_bits(int_to_bits(original_byte_count, 32)) + + # Encode BLAKE3 hash (32 bytes = 256 bits) + if blake3_hash: + hash_bytes = base64.b64decode(blake3_hash) + final_writer.write_bits(bytes_to_bits(hash_bytes)) + + final_writer.write_bits(payload_writer.get_bits()) + else: + # Intermediate stage: No header, pure payload + final_writer.write_bits(payload_writer.get_bits()) + + # Write as text file + bit_payload = final_writer.get_bitstring() + with open(dest, "w") as fout: + fout.write(bit_payload) + + # Write report + report_path = dest.parent / f"encode_report{stage}.txt" + with open(report_path, "w") as rep: + rep.write(f"encode stage {stage}: original_bytes={original_byte_count} full_blocks={full_blocks} remainder_bytes={remainder_bytes} total_bits={final_writer.total_bits}\n") + if blake3_hash: + rep.write(f"blake3_hash: {blake3_hash}\n") + for line in logs: + rep.write(line + "\n") + + # Final output + final_writer = BitWriter() + + if is_final_stage: + # Final stage: Add minimal header (1 bit block_flag only) + block_flag = 1 if not remainder else 0 + final_writer.write_bits([block_flag]) + final_writer.write_bits(payload_writer.get_bits()) + else: + # Intermediate stage: No header, pure payload + final_writer.write_bits(payload_writer.get_bits()) + + # Write as text file + bit_payload = final_writer.get_bitstring() + with open(dest, "w") as fout: + fout.write(bit_payload) + + # Write report + report_path = dest.parent / f"encode_report{stage}.txt" + with open(report_path, "w") as rep: + for line in logs: + rep.write(line + "\n") + + +def decode_once(src: Path, dest: Path, stage: int = 1, total_stages: int = 1, is_final: bool = False) -> None: + # Read bitstring from text file + bitstring = src.read_text().strip() + reader = BitReader(bitstring) + logs: List[str] = [] + is_first_decode = (stage == 1) + + # Only first decode (from final encoded file) has header + block_flag = None + original_byte_count = None + expected_blake3_hash = None + + if is_first_decode: + if reader.remaining() < 289: # 1 bit block_flag + 32 bits size + 256 bits hash + raise ValueError("Encoded payload is too small") + + # Read header: 1 bit block_flag + 32 bits original file size + 256 bits BLAKE3 hash + block_flag = reader.read_int(1) + original_byte_count = reader.read_int(32) + + # Read BLAKE3 hash (256 bits = 32 bytes) + hash_bits = reader.read_bits(256) + hash_bytes = bits_to_bytes(hash_bits) + expected_blake3_hash = base64.b64encode(hash_bytes).decode('ascii') + + if block_flag not in (0, 1): + raise ValueError("Invalid block flag") + + logs.append(f"decode header: block_flag={block_flag} original_byte_count={original_byte_count}") + logs.append(f"expected_blake3_hash: {expected_blake3_hash}") + + payload_bits = reader.remaining() + + # Calculate blocks and remainder dynamically + full_blocks = payload_bits // 255 + remainder_bits = payload_bits % 255 + + # Validate block_flag if we have it + if block_flag is not None: + if block_flag == 1 and remainder_bits != 0: + raise ValueError(f"Header says last block was full but {remainder_bits} trailing bits exist") + elif block_flag == 0 and remainder_bits == 0: + raise ValueError("Header expects leftover bits but count is zero") + + out_bits: List[int] = [] + + # Process full 255-bit blocks + for block_index in range(full_blocks): + # No overflow bit - read meta (4 bits) + rank (251 bits) = 255 bits + meta = reader.read_int(4) + rank_value = reader.read_int(251) + bits255 = unrank_true_fixed_k128(rank_value) + missing_bit = decode_missing_bit(bits255) + bits256 = bits255 + [missing_bit] + r_value = decode256(bits256, meta) + block_value = r_value + block_bytes = (block_value & MASK_256).to_bytes(32, "big") + out_bits.extend(bytes_to_bits(block_bytes)) + logs.append( + f"decode block {block_index}: meta={meta} rank_bits=251 k255={sum(bits255)} match=True" + ) + + # Process remainder bits if any + if remainder_bits > 0: + tail_bits = reader.read_bits(remainder_bits) + out_bits.extend(tail_bits) + logs.append(f"decode remainder_bits={remainder_bits} match=True") + + # Write output + if is_final: + # Final decode: write as binary with exact original size + # Pad to byte boundary first + while len(out_bits) % 8 != 0: + out_bits.append(0) + out_bytes = bits_to_bytes(out_bits) + + # Truncate to original file size if we know it + if original_byte_count is not None: + out_bytes = out_bytes[:original_byte_count] + logs.append(f"truncated to original size: {original_byte_count} bytes") + + # Verify BLAKE3 hash + if expected_blake3_hash is not None: + actual_hash = compute_blake3_hash(out_bytes) + if verify_blake3_hash(out_bytes, expected_blake3_hash): + logs.append(f"✓ BLAKE3 verification PASSED: {actual_hash}") + print(f"✓ BLAKE3 hash verification PASSED") + else: + logs.append(f"✗ BLAKE3 verification FAILED!") + logs.append(f" Expected: {expected_blake3_hash}") + logs.append(f" Actual: {actual_hash}") + print(f"✗ WARNING: BLAKE3 hash verification FAILED!", file=sys.stderr) + print(f" Expected: {expected_blake3_hash}", file=sys.stderr) + print(f" Actual: {actual_hash}", file=sys.stderr) + + with open(dest, "wb") as fout: + fout.write(out_bytes) + else: + # Intermediate stage: write as text bitstring (no header) + bit_output = bits_to_bitstring(out_bits) + with open(dest, "w") as fout: + fout.write(bit_output) + + # Write report + report_path = dest.parent / f"decode_report{stage}.txt" + with open(report_path, "w") as rep: + rep.write(f"decode stage {stage}: full_blocks={full_blocks} remainder_bits={remainder_bits} total_output_bits={len(out_bits)}\n") + for line in logs: + rep.write(line + "\n") + + +def hyphenated_name(src: Path) -> str: + name = src.name + if "." in name: + stem, ext = name.rsplit(".", 1) + return f"{stem}-{ext}" + return name + + +def encode_stage_name(base: str, iteration: int) -> str: + if iteration < 1: + raise ValueError("Iteration must be >= 1") + return f"{base}.btl" if iteration == 1 else f"{base}.{iteration}.btl" + + +def encoded_filename(src: Path, repeat: int) -> Path: + base = hyphenated_name(src) + return src.with_name(encode_stage_name(base, repeat)) + + +def run_encode(path_str: str, repeat: int) -> Path: + src = Path(path_str) + if repeat < 1: + raise ValueError("Repeat count must be >= 1") + base = hyphenated_name(src) + working_src = src + final_path = encoded_filename(src, repeat) + + for iteration in range(1, repeat + 1): + dest_name = encode_stage_name(base, iteration) + dest = src.with_name(dest_name) + + encode_once(working_src, dest, iteration, repeat) + + if working_src != src: + try: + working_src.unlink() + except FileNotFoundError: + pass + + working_src = dest + + return final_path + + +def parse_encoded_name(path: Path) -> Tuple[str, int]: + name = path.name + if not name.endswith(".btl"): + raise ValueError("Encoded file must end with .btl") + base = name[:-4] + repeat = 1 + if "." in base: + candidate, suffix = base.rsplit(".", 1) + if suffix.isdigit(): + base = candidate + repeat = int(suffix) + if repeat < 1: + raise ValueError("Repeat count inferred from file name must be >= 1") + return base, repeat + + +def restore_original_name(base: str) -> str: + if "-" not in base: + return base + stem, ext = base.rsplit("-", 1) + return f"{stem}.{ext}" + + +def run_decode(path_str: str) -> Path: + src = Path(path_str) + base, repeat = parse_encoded_name(src) + final_name = restore_original_name(base) + working_src = src + stage_counter = 1 + for iteration in range(repeat, 0, -1): + is_final = (iteration == 1) + + if is_final: + dest = src.with_name(final_name) + else: + dest_name = encode_stage_name(base, iteration - 1) + dest = src.with_name(dest_name) -def run_experiment(num_tests=10, seed=0): - random.seed(seed) - results = [] + decode_once(working_src, dest, stage_counter, repeat, is_final=is_final) - for i in range(num_tests): - bits256 = random_256_k128() - bits255 = bits256[:-1] - k255 = bits255.count("1") + if working_src != src: + try: + working_src.unlink() + except FileNotFoundError: + pass - # compute true rank (with fixed k = 128) - r = true_rank_fixed_k128(bits255) - rank_bits = r.bit_length() + working_src = dest + stage_counter += 1 - # pad rank to 251 bits - rank_padded = pad_rank_to_251(r) + return src.with_name(final_name) - # decode using k255 rule - decoded = decode_one(bits255, k255) - match = (decoded == bits256) - print(f"--- Test {i+1} ---") - print(f"Original last bit : {bits256[-1]}") - print(f"255-bit weight (k255) : {k255}") - print(f"Actual rank bit length : {rank_bits}") - print(f"Padded rank length : {len(rank_padded)} (ALWAYS 251)") - print(f"Recovered last bit : {decoded[-1]}") - print(f"Match : {match}") - print() +def parse_args(argv: List[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Bitloss encoder / decoder") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("-e", "--encode", nargs="+", metavar=("FILE", "REPEAT"), help="Encode file; optional repeat count") + group.add_argument("-d", "--decode", metavar="FILE", help="Decode file (repeat inferred from name)") + return parser.parse_args(argv) - results.append(rank_bits) - print("--------- SUMMARY ----------") - print("Actual rank bit lengths:", results) - print("----------------------------") +def main(argv: List[str]) -> int: + args = parse_args(argv) + try: + if args.encode is not None: + if not (1 <= len(args.encode) <= 2): + raise ValueError("Encode requires FILE and optional REPEAT") + src = args.encode[0] + repeat = int(args.encode[1]) if len(args.encode) == 2 else 1 + output = run_encode(src, repeat) + print(f"Encoded → {output}") + else: + src = args.decode + output = run_decode(src) + print(f"Decoded → {output}") + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + return 0 if __name__ == "__main__": - run_experiment() + raise SystemExit(main(sys.argv[1:])) \ No newline at end of file