Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 115 additions & 24 deletions node/rustchain_v2_integrated_v2.2.1_rip200.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,102 @@ def init_db():
"saturn_sh2": 2.6, "gba_arm7": 2.3, "default": 2.5}
}

POWERPC_ARCHES = {"g3", "g4", "g5", "power8", "power9", "powerpc", "power macintosh"}
X86_CPU_BRANDS = {"intel", "xeon", "core", "celeron", "pentium", "amd", "ryzen", "epyc", "athlon", "threadripper"}
ARM_CPU_BRANDS = {"arm", "aarch64", "cortex", "neoverse", "apple m1", "apple m2", "apple m3"}


def _fingerprint_checks_map(fingerprint: dict) -> dict:
if not isinstance(fingerprint, dict):
return {}
checks = fingerprint.get("checks", {})
return checks if isinstance(checks, dict) else {}


def _fingerprint_check_data(fingerprint: dict, check_name: str) -> dict:
item = _fingerprint_checks_map(fingerprint).get(check_name, {})
if isinstance(item, dict):
data = item.get("data", {})
return data if isinstance(data, dict) else {}
return {}


def _claimed_family_and_arch(device: dict) -> tuple:
family = str(device.get("device_family") or device.get("family") or "x86")
arch = str(device.get("device_arch") or device.get("arch") or "default")
return family, arch


def _cpu_brand_string(device: dict) -> str:
return " ".join(
str(device.get(key) or "").strip()
for key in ("cpu", "device_model", "model", "brand")
if str(device.get(key) or "").strip()
).lower()


def _has_any_token(text: str, tokens: set) -> bool:
return any(token in text for token in tokens)


def _claims_powerpc(device: dict) -> bool:
family, arch = _claimed_family_and_arch(device)
family_lower = family.lower()
arch_lower = arch.lower()
return "powerpc" in family_lower or "ppc" in family_lower or arch_lower in POWERPC_ARCHES


def _powerpc_cpu_brand_matches(device: dict) -> bool:
cpu_brand = _cpu_brand_string(device)
if not cpu_brand:
return False
if _has_any_token(cpu_brand, X86_CPU_BRANDS | ARM_CPU_BRANDS):
return False
return any(token in cpu_brand for token in ("powerpc", "ppc", "ibm power", "g3", "g4", "g5", "7447", "7450", "7455", "7448", "970", "power8", "power9"))


def _has_powerpc_simd_evidence(fingerprint: dict) -> bool:
simd_data = _fingerprint_check_data(fingerprint, "simd_identity")
x86_features = simd_data.get("x86_features", [])
if not isinstance(x86_features, list):
x86_features = []
has_x86 = bool(x86_features) or bool(simd_data.get("has_sse")) or bool(simd_data.get("has_avx"))
has_ppc = bool(
simd_data.get("altivec")
or simd_data.get("vsx")
or simd_data.get("vec_perm")
or simd_data.get("has_altivec")
)
return has_ppc and not has_x86


def _has_powerpc_cache_profile(fingerprint: dict) -> bool:
cache_data = _fingerprint_check_data(fingerprint, "cache_timing")
arch_hint = str(cache_data.get("arch") or cache_data.get("architecture") or "").lower()
if "powerpc" in arch_hint or "ppc" in arch_hint:
return True
l2_l1_ratio = float(cache_data.get("l2_l1_ratio", 0.0) or 0.0)
l3_l2_ratio = float(cache_data.get("l3_l2_ratio", 0.0) or 0.0)
hierarchy_ratio = float(cache_data.get("hierarchy_ratio", 0.0) or 0.0)
return (l2_l1_ratio >= 1.05 and l3_l2_ratio >= 1.05) or hierarchy_ratio >= 1.2


def derive_verified_device(device: dict, fingerprint: dict, fingerprint_passed: bool) -> dict:
family, arch = _claimed_family_and_arch(device)
if not _claims_powerpc(device):
return {"device_family": family, "device_arch": arch}

cpu_brand = _cpu_brand_string(device)
simd_data = _fingerprint_check_data(fingerprint, "simd_identity")
if fingerprint_passed and _powerpc_cpu_brand_matches(device) and _has_powerpc_simd_evidence(fingerprint) and _has_powerpc_cache_profile(fingerprint):
return {"device_family": "PowerPC", "device_arch": arch.upper()}

if _has_any_token(cpu_brand, ARM_CPU_BRANDS) or bool(simd_data.get("has_neon")):
return {"device_family": "ARM", "device_arch": "default"}
if _has_any_token(cpu_brand, X86_CPU_BRANDS) or bool(simd_data.get("has_sse")) or bool(simd_data.get("has_avx")):
return {"device_family": "x86_64", "device_arch": "default"}
return {"device_family": "x86", "device_arch": "default"}

# RIP-0146b: Enrollment enforcement config
ENROLL_REQUIRE_TICKET = os.getenv("ENROLL_REQUIRE_TICKET", "1") == "1"
ENROLL_TICKET_TTL_S = int(os.getenv("ENROLL_TICKET_TTL_S", "600"))
Expand Down Expand Up @@ -1112,11 +1208,12 @@ def auto_induct_to_hall(miner: str, device: dict):

def record_attestation_success(miner: str, device: dict, fingerprint_passed: bool = False, source_ip: str = None, signals: dict = None, fingerprint: dict = None):
now = int(time.time())
verified_device = derive_verified_device(device or {}, fingerprint if isinstance(fingerprint, dict) else {}, fingerprint_passed)
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
INSERT OR REPLACE INTO miner_attest_recent (miner, ts_ok, device_family, device_arch, entropy_score, fingerprint_passed, source_ip)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (miner, now, device.get("device_family", device.get("family", "unknown")), device.get("device_arch", device.get("arch", "unknown")), 0.0, 1 if fingerprint_passed else 0, source_ip))
""", (miner, now, verified_device["device_family"], verified_device["device_arch"], 0.0, 1 if fingerprint_passed else 0, source_ip))
_ = append_fingerprint_snapshot(conn, miner, fingerprint if isinstance(fingerprint, dict) else {}, now)
conn.commit()

Expand All @@ -1128,7 +1225,7 @@ def record_attestation_success(miner: str, device: dict, fingerprint_passed: boo
except Exception as _fe:
print(f"[RIP-201] Fleet signal recording warning: {_fe}")
# Auto-induct to Hall of Rust
auto_induct_to_hall(miner, device)
auto_induct_to_hall(miner, verified_device)


TEMPORAL_HISTORY_LIMIT = 10
Expand Down Expand Up @@ -1331,9 +1428,7 @@ def validate_fingerprint_data(fingerprint: dict, claimed_device: dict = None) ->
if not isinstance(fingerprint, dict):
return False, "fingerprint_not_dict"

checks = fingerprint.get("checks", {})
if not isinstance(checks, dict):
checks = {}
checks = _fingerprint_checks_map(fingerprint)
claimed_device = claimed_device if isinstance(claimed_device, dict) else {}

# FIX #305: Reject empty fingerprint payloads (e.g. fingerprint={} or checks={})
Expand Down Expand Up @@ -1466,23 +1561,18 @@ def get_check_status(check_data):
claimed_device.get("arch", "modern")).lower()

# If claiming PowerPC, check for x86-specific signals in fingerprint
if claimed_arch in {"g4", "g5", "g3", "powerpc", "power macintosh"}:
simd_check = checks.get("simd_identity")
if isinstance(simd_check, dict):
simd_data = simd_check.get("data", {})
if not isinstance(simd_data, dict):
simd_data = {}
# x86 SIMD features should NOT be present on PowerPC
x86_features = simd_data.get("x86_features", [])
if x86_features:
print(f"[FINGERPRINT] REJECT: claims {claimed_arch} but has x86 SIMD: {x86_features}")
return False, f"arch_mismatch:claims_{claimed_arch}_has_x86_simd"

# PowerPC should have altivec/vsx indicators
has_ppc_simd = simd_data.get("altivec") or simd_data.get("vsx") or simd_data.get("has_altivec")
# Don't reject if no SIMD data at all (old miners) but log it
if x86_features and not has_ppc_simd:
print(f"[FINGERPRINT] SUSPICIOUS: claims {claimed_arch}, x86 SIMD present, no AltiVec")
if claimed_arch in POWERPC_ARCHES:
if not _powerpc_cpu_brand_matches(claimed_device):
print(f"[FINGERPRINT] REJECT: claims {claimed_arch} but CPU brand does not match PowerPC")
return False, f"cpu_brand_mismatch:claims_{claimed_arch}"

if not _has_powerpc_simd_evidence(fingerprint):
print(f"[FINGERPRINT] REJECT: claims {claimed_arch} but lacks PowerPC SIMD evidence")
return False, f"missing_powerpc_simd:{claimed_arch}"

if not _has_powerpc_cache_profile(fingerprint):
print(f"[FINGERPRINT] REJECT: claims {claimed_arch} but lacks PowerPC cache profile")
return False, f"missing_powerpc_cache_profile:{claimed_arch}"

# ── PHASE 3: ROM fingerprint (retro platforms) ──
rom_passed, rom_data = get_check_status(checks.get("rom_fingerprint"))
Expand Down Expand Up @@ -2308,8 +2398,9 @@ def submit_attestation():
# This eliminates the need for miners to make a separate POST /epoch/enroll call
try:
epoch = slot_to_epoch(current_slot())
family = device.get("device_family") or device.get("family") or "x86"
arch_for_weight = device.get("device_arch") or device.get("arch") or "default"
verified_device = derive_verified_device(device or {}, fingerprint if isinstance(fingerprint, dict) else {}, fingerprint_passed)
family = verified_device["device_family"]
arch_for_weight = verified_device["device_arch"]
hw_weight = HARDWARE_WEIGHTS.get(family, {}).get(arch_for_weight, HARDWARE_WEIGHTS.get(family, {}).get("default", 1.0))

# VM miners get minimal weight
Expand Down
58 changes: 36 additions & 22 deletions tests/test_fingerprint_improved.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,44 @@ def test_valid_clock_drift_passes(self):
class TestVintageHardwareTiming:
"""Test vintage hardware-specific timing requirements."""

@staticmethod
def _verified_g4_checks(cv: float) -> dict:
return {
"anti_emulation": VALID_ANTI_EMULATION,
"clock_drift": {
"passed": True,
"data": {
"cv": cv,
"samples": 100,
},
},
"simd_identity": {
"passed": True,
"data": {
"has_altivec": True,
"has_sse": False,
"has_avx": False,
"vec_perm": True,
},
},
"cache_timing": {
"passed": True,
"data": {
"arch": "powerpc",
"l2_l1_ratio": 1.4,
"l3_l2_ratio": 1.15,
},
},
}

def test_vintage_stability_too_high(self):
"""Verify rejection of suspicious stability on vintage hardware."""
claimed_device = {
"device_arch": "G4"
"device_arch": "G4",
"cpu": "PowerPC G4 7447A",
}
fingerprint = {
"checks": {
"anti_emulation": VALID_ANTI_EMULATION,
"clock_drift": {
"passed": True,
"data": {
"cv": 0.001, # Too stable for G4
"samples": 100
}
}
}
"checks": self._verified_g4_checks(0.001)
}
passed, reason = validate_fingerprint_data(fingerprint, claimed_device)
assert passed is False, "Suspiciously stable vintage timing should fail"
Expand All @@ -337,19 +359,11 @@ def test_vintage_stability_too_high(self):
def test_vintage_normal_variation_passes(self):
"""Normal variation for vintage hardware should pass."""
claimed_device = {
"device_arch": "G4"
"device_arch": "G4",
"cpu": "PowerPC G4 7447A",
}
fingerprint = {
"checks": {
"anti_emulation": VALID_ANTI_EMULATION,
"clock_drift": {
"passed": True,
"data": {
"cv": 0.05, # Normal variation
"samples": 100
}
}
}
"checks": self._verified_g4_checks(0.05)
}
passed, reason = validate_fingerprint_data(fingerprint, claimed_device)
assert passed is True, "Normal vintage timing should pass"
Expand Down
Loading
Loading