diff --git a/allways/chain_providers/base.py b/allways/chain_providers/base.py index d528ba9..d1177ff 100644 --- a/allways/chain_providers/base.py +++ b/allways/chain_providers/base.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from typing import Any, Optional, Tuple +import bittensor as bt + from allways.chains import ChainDefinition @@ -42,17 +44,72 @@ def check_connection(self, **kwargs) -> None: ... @abstractmethod - def verify_transaction( + def fetch_matching_tx( self, tx_hash: str, expected_recipient: str, expected_amount: int, block_hint: int = 0 ) -> Optional[TransactionInfo]: - """Verify a transaction; returns TransactionInfo if found, None if not found, - raises ProviderUnreachableError on transient failures. + """Chain-specific fetch — return TransactionInfo if the tx exists and matches + recipient + amount, otherwise None. Raises ProviderUnreachableError on + transient backend failures. Uses >= for amount (overpayment is acceptable on-chain). block_hint: If > 0, providers can use this for O(1) lookup instead of scanning. + + Not called directly by application code — use ``verify_transaction``, + which wraps this with the common confirmed/sender post-checks. """ ... + def verify_transaction( + self, + tx_hash: str, + expected_recipient: str, + expected_amount: int, + block_hint: int = 0, + expected_sender: Optional[str] = None, + require_confirmed: bool = False, + ) -> Optional[TransactionInfo]: + """Verify a transaction against the shared post-fetch checklist. + + Dispatches to the provider's ``fetch_matching_tx`` for the chain-specific + scan, then applies the common checks every caller cares about: + + - ``require_confirmed`` — if True, reject txs that don't have enough + confirmations for the chain. Default False, because axon/pending-confirm + flows want the partial TransactionInfo so they can queue and retry. + - ``expected_sender`` — if provided, reject txs whose sender doesn't + match. Strict: an empty/unparseable sender from the provider also + fails the check, since we can't prove the tx came from the reserved + address. Closing this gap prevents a "malformed-input evades the + defense" class of attack. + + Rejections are logged once in the base so observability for the defense + is in one place instead of duplicated at every call site. + """ + tx_info = self.fetch_matching_tx( + tx_hash=tx_hash, + expected_recipient=expected_recipient, + expected_amount=expected_amount, + block_hint=block_hint, + ) + if tx_info is None: + return None + + if require_confirmed and not tx_info.confirmed: + bt.logging.debug( + f'verify_transaction: tx {tx_hash[:16]}... not yet confirmed ' + f'({tx_info.confirmations}/{self.get_chain().min_confirmations})' + ) + return None + + if expected_sender and tx_info.sender != expected_sender: + bt.logging.warning( + f'verify_transaction: sender mismatch on tx {tx_hash[:16]}... ' + f'(expected {expected_sender}, got {tx_info.sender!r})' + ) + return None + + return tx_info + @abstractmethod def get_balance(self, address: str) -> int: ... diff --git a/allways/chain_providers/bitcoin.py b/allways/chain_providers/bitcoin.py index 6a8f71f..5838850 100644 --- a/allways/chain_providers/bitcoin.py +++ b/allways/chain_providers/bitcoin.py @@ -142,10 +142,10 @@ def rpc_call(self, method: str, params: Optional[list] = None) -> Optional[dict] bt.logging.error(f'BTC RPC call failed ({method}): {e}') return None - def verify_transaction( + def fetch_matching_tx( self, tx_hash: str, expected_recipient: str, expected_amount: int, block_hint: int = 0 ) -> Optional[TransactionInfo]: - """Verify a Bitcoin transaction via RPC with Blockstream fallback.""" + """Look up a Bitcoin tx via RPC with Blockstream fallback.""" result = self.rpc_verify_transaction(tx_hash, expected_recipient, expected_amount) if result is not None: return result diff --git a/allways/chain_providers/subtensor.py b/allways/chain_providers/subtensor.py index 194c8f3..6da2763 100644 --- a/allways/chain_providers/subtensor.py +++ b/allways/chain_providers/subtensor.py @@ -165,14 +165,16 @@ def get_block_raw(self, block_num: int, block_hash: str) -> Optional[dict]: bt.logging.debug(f'Raw block fetch failed for block {block_num}: {e}') return None - def verify_transaction( + def fetch_matching_tx( self, tx_hash: str, expected_recipient: str, expected_amount: int, block_hint: int = 0 ) -> Optional[TransactionInfo]: - """Verify a TAO transfer; raises ProviderUnreachableError if subtensor is unreachable. + """Scan for a TAO transfer matching recipient + amount. If block_hint > 0, checks the hinted block ±3. Otherwise scans the last 150 blocks. The ±3 window covers small clock/finality skews between the caller's block_hint and the block the transfer actually landed in. + + Raises ProviderUnreachableError if subtensor is unreachable. """ try: current_block = self.subtensor.get_current_block() diff --git a/allways/cli/dendrite_lite.py b/allways/cli/dendrite_lite.py index 3e3276d..3681234 100644 --- a/allways/cli/dendrite_lite.py +++ b/allways/cli/dendrite_lite.py @@ -7,9 +7,12 @@ """ from pathlib import Path +from typing import List, Optional import bittensor as bt +from allways.contract_client import AllwaysContractClient + EPHEMERAL_WALLET_DIR = Path.home() / '.allways' / 'ephemeral_wallet' EPHEMERAL_WALLET_NAME = 'allways_ephemeral' EPHEMERAL_HOTKEY_NAME = 'default' @@ -34,7 +37,11 @@ def get_ephemeral_wallet() -> bt.Wallet: return wallet -def discover_validators(subtensor: bt.Subtensor, netuid: int, contract_client=None) -> list: +def discover_validators( + subtensor: bt.Subtensor, + netuid: int, + contract_client: Optional[AllwaysContractClient] = None, +) -> List[bt.AxonInfo]: """Discover validator axon endpoints from metagraph. Filters for UIDs with validator_permit=True and is_serving=True. @@ -61,7 +68,12 @@ def discover_validators(subtensor: bt.Subtensor, netuid: int, contract_client=No return axons -def broadcast_synapse(wallet: bt.Wallet, axons: list, synapse, timeout: float = 30.0) -> list: +def broadcast_synapse( + wallet: bt.Wallet, + axons: List[bt.AxonInfo], + synapse: bt.Synapse, + timeout: float = 30.0, +) -> list: """Broadcast a synapse to all validator axons via dendrite. Returns list of response synapses. diff --git a/allways/commitments.py b/allways/commitments.py index 0a28071..18c8bde 100644 --- a/allways/commitments.py +++ b/allways/commitments.py @@ -128,17 +128,33 @@ def read_miner_commitment( def read_miner_commitments(subtensor: bt.Subtensor, netuid: int) -> List[MinerPair]: - """Read all miner commitments from chain, parse into MinerPair list.""" - pairs = [] + """Read all miner commitments for the netuid in a single RPC call. + + Uses substrate-interface's ``query_map`` over the ``CommitmentOf`` double map + keyed by ``(netuid, hotkey)``. One RPC round-trip returns every committed + hotkey on the subnet — cheaper than the old N-RPC for-loop, matters most + on full validator polling cadence. + """ + pairs: List[MinerPair] = [] try: metagraph = subtensor.metagraph(netuid) - for uid in range(metagraph.n.item()): - hotkey = metagraph.hotkeys[uid] - commitment = get_commitment(subtensor, netuid, hotkey) - if commitment: - pair = parse_commitment_data(commitment, uid=uid, hotkey=hotkey) - if pair: - pairs.append(pair) + hotkey_to_uid = {metagraph.hotkeys[uid]: uid for uid in range(metagraph.n.item())} + result = subtensor.substrate.query_map( + module='Commitments', + storage_function='CommitmentOf', + params=[netuid], + ) + for key, metadata in result: + hotkey = str(key.value) if hasattr(key, 'value') else str(key) + uid = hotkey_to_uid.get(hotkey) + if uid is None: + continue # miner dereg'd but commitment still in storage + commitment = decode_commitment_field(metadata) + if not commitment: + continue + pair = parse_commitment_data(commitment, uid=uid, hotkey=hotkey) + if pair: + pairs.append(pair) except (ConnectionError, TimeoutError) as e: bt.logging.warning(f'Transient error reading commitments: {e}') except Exception as e: diff --git a/allways/constants.py b/allways/constants.py index 1c4b6a7..1f10f15 100644 --- a/allways/constants.py +++ b/allways/constants.py @@ -3,12 +3,11 @@ NETUID_LOCAL = 2 # ─── Contract ────────────────────────────────────────────── -# Default mainnet address; override via CONTRACT_ADDRESS env var for testnets -# or alternate deployments. +# Mainnet default; override via CONTRACT_ADDRESS env var. CONTRACT_ADDRESS = '5FTkUEhRmLPsALn4b7bJpVFhDQqohGbc6khnmA2aiYFLMZYP' # ─── Polling ────────────────────────────────────────────── -# Bittensor base neuron loop heartbeat — not the scoring / forward cadence. +# Bittensor base-neuron heartbeat, not the scoring/forward cadence. MINER_POLL_INTERVAL_SECONDS = 12 VALIDATOR_POLL_INTERVAL_SECONDS = 12 @@ -16,68 +15,51 @@ COMMITMENT_VERSION = 1 # ─── Unit Conversions ──────────────────────────────────── -TAO_TO_RAO = 1_000_000_000 # 1 TAO = 10^9 rao -BTC_TO_SAT = 100_000_000 # 1 BTC = 10^8 satoshi +TAO_TO_RAO = 1_000_000_000 +BTC_TO_SAT = 100_000_000 # ─── Rate Encoding ─────────────────────────────────────── -RATE_PRECISION = 10**18 # Fixed-point precision for on-chain rate storage +RATE_PRECISION = 10**18 # ─── Transaction Fees ──────────────────────────────────── MIN_BALANCE_FOR_TX_RAO = 250_000_000 # 0.25 TAO minimum for extrinsic fees -BTC_MIN_FEE_RATE = 2 # sat/vB — minimum BTC fee rate floor to avoid stuck txs +BTC_MIN_FEE_RATE = 2 # sat/vB — floor to avoid stuck txs # ─── Miner ─────────────────────────────────────────────── -# Default cushion the miner applies to every swap's timeout_block before -# deciding to fulfill. Protects against slow dest-chain inclusion eating into -# the timeout window. Overridable via MINER_TIMEOUT_CUSHION_BLOCKS env var. +# Cushion subtracted from each swap's timeout before the miner agrees to +# fulfill, protecting against slow dest-chain inclusion. Overridable via +# MINER_TIMEOUT_CUSHION_BLOCKS. DEFAULT_MINER_TIMEOUT_CUSHION_BLOCKS = 5 # ─── Scoring ───────────────────────────────────────────── -SCORING_WINDOW_BLOCKS = 3600 # ~12 hours at 12s/block -SCORING_INTERVAL_STEPS = 300 # Score every 300 forward passes (~1 hour at 12s poll) -SCORING_EMA_ALPHA = 1.0 # Instantaneous — score based on current window only, no smoothing - -# ─── V1 Crown-Time Scoring ─────────────────────────────── -# Validator throttle: rate_events for a hotkey are only accepted when this many -# blocks have elapsed since the previous accepted event. Prevents rate-war games -# and keeps crown-time attribution stable. -RATE_UPDATE_MIN_INTERVAL_BLOCKS = 75 -# Rate/collateral event retention. Must be >= SCORING_WINDOW_BLOCKS so the -# window-start state can always be reconstructed from history. -EVENT_RETENTION_BLOCKS = 2 * SCORING_WINDOW_BLOCKS -# How often the validator polls miner commitments from its local subtensor. -# 15 blocks ≈ 3 min — 1/5 of RATE_UPDATE_MIN_INTERVAL_BLOCKS for good responsiveness -# without hammering the RPC. -COMMITMENT_POLL_INTERVAL_BLOCKS = 15 -# Emission allocation per swap direction. Sum of values is the portion of each -# scoring pass allocated to crown-time winners; 1 - sum() recycles to RECYCLE_UID. +SCORING_WINDOW_BLOCKS = 1200 # ~4 hours at 12s/block — also the scoring cadence +SCORING_EMA_ALPHA = 1.0 # Instantaneous — no smoothing across passes +CREDIBILITY_WINDOW_BLOCKS = 216_000 # ~30 days DIRECTION_POOLS: dict[tuple[str, str], float] = { ('tao', 'btc'): 0.04, ('btc', 'tao'): 0.04, } -# Harsh penalty for unreliable miners: success_rate ** SUCCESS_EXPONENT. -# 100% → 1.0, 90% → 0.729, 80% → 0.512, 50% → 0.125. +# 100% → 1.0, 90% → 0.729, 80% → 0.512, 50% → 0.125 SUCCESS_EXPONENT: int = 3 # ─── Emission Recycling ──────────────────────────────────── -RECYCLE_UID = 53 # Subnet owner UID — emissions recycled on-chain +RECYCLE_UID = 53 # Subnet owner UID # ─── Reservation ───────────────────────────────────────── -RESERVATION_COOLDOWN_BLOCKS = 150 # ~30 min base cooldown on failed reservation (validator-enforced) -RESERVATION_COOLDOWN_MULTIPLIER = 2 # Exponential backoff: 150 → 300 → 600 ... -MAX_RESERVATIONS_PER_ADDRESS = 1 # 1 active reservation per source address (validator-enforced) -EXTEND_THRESHOLD_BLOCKS = 20 # ~4 min — vote to extend reservation when this many blocks remain +RESERVATION_COOLDOWN_BLOCKS = 150 # ~30 min base cooldown on failed reservation +RESERVATION_COOLDOWN_MULTIPLIER = 2 # 150 → 300 → 600 ... +MAX_RESERVATIONS_PER_ADDRESS = 1 +EXTEND_THRESHOLD_BLOCKS = 20 # ~4 min — vote to extend when this many blocks remain # ─── Protocol Fee ────────────────────────────────────────── -# Hardcoded 1% protocol fee matching the smart contract's immutable -# FEE_DIVISOR constant. No longer read from chain — both sides pin to 100. +# Hardcoded 1% — matches the contract's immutable FEE_DIVISOR. FEE_DIVISOR = 100 -# ─── Display Only (real values enforced on-chain by contract) ───── -# For CLI display and fallback logic only. Actual values are managed -# via `alw admin` commands and read from the contract at runtime. -MIN_COLLATERAL_TAO = 0.1 # Fallback when the contract min_collateral read fails -DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS = 30 # ~5 min — `alw admin set-timeout` -DEFAULT_MIN_SWAP_AMOUNT_RAO = 100_000_000 # 0.1 TAO — `alw admin set-min-swap` -DEFAULT_MAX_SWAP_AMOUNT_RAO = 500_000_000 # 0.5 TAO — `alw admin set-max-swap` -RESERVATION_TTL_BLOCKS = 30 # ~5 min — `alw admin set-reservation-ttl` +# ─── Display Only ───────────────────────────────────────── +# Fallbacks/defaults for CLI display. Live values are written by `alw admin` +# and read from the contract at runtime. +MIN_COLLATERAL_TAO = 0.1 +DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS = 30 # ~5 min +DEFAULT_MIN_SWAP_AMOUNT_RAO = 100_000_000 # 0.1 TAO +DEFAULT_MAX_SWAP_AMOUNT_RAO = 500_000_000 # 0.5 TAO +RESERVATION_TTL_BLOCKS = 30 # ~5 min diff --git a/allways/miner/fulfillment.py b/allways/miner/fulfillment.py index 9bb158b..5b4e7be 100644 --- a/allways/miner/fulfillment.py +++ b/allways/miner/fulfillment.py @@ -2,6 +2,7 @@ import json import os +from dataclasses import dataclass from pathlib import Path from typing import Dict, Optional, Set, Tuple @@ -14,6 +15,21 @@ from allways.utils.rate import expected_swap_amounts +@dataclass +class SentSwap: + """Persistent record of a destination-chain send for a single swap. + + Created when ``send_dest_funds`` succeeds; ``marked_fulfilled`` flips to + True after the contract accepts ``mark_fulfilled``. A retry after crash + finds this record, skips re-sending (prevents double-sends), and only + re-calls mark_fulfilled if it didn't already succeed. + """ + + to_tx_hash: str + to_tx_block: int + marked_fulfilled: bool + + def load_timeout_cushion_blocks() -> int: """Read MINER_TIMEOUT_CUSHION_BLOCKS from env, falling back to the default. @@ -66,8 +82,7 @@ def __init__( # miner loop when a new rate is posted. Shared dict so the miner # neuron's reload mutates what we read here. self.my_addresses: Dict[str, str] = my_addresses if my_addresses is not None else {} - # swap_id → (to_tx_hash, to_tx_block, marked_fulfilled) - self.sent: Dict[int, Tuple[str, int, bool]] = {} + self.sent: Dict[int, SentSwap] = {} self.sent_cache_path = sent_cache_path self.load_sent_cache() @@ -78,10 +93,11 @@ def load_sent_cache(self): try: data = json.loads(self.sent_cache_path.read_text()) for swap_id_str, entry in data.items(): - # Back-compat: old cache entries were 2-tuples. Treat restored - # entries as not-yet-marked-fulfilled so the retry path runs. - marked = bool(entry[2]) if len(entry) >= 3 else False - self.sent[int(swap_id_str)] = (entry[0], entry[1], marked) + self.sent[int(swap_id_str)] = SentSwap( + to_tx_hash=entry[0], + to_tx_block=entry[1], + marked_fulfilled=bool(entry[2]), + ) if self.sent: bt.logging.info(f'Restored {len(self.sent)} cached send(s) from disk') except Exception as e: @@ -93,7 +109,7 @@ def save_sent_cache(self): return try: self.sent_cache_path.parent.mkdir(parents=True, exist_ok=True) - data = {str(k): [v[0], v[1], v[2]] for k, v in self.sent.items()} + data = {str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled] for swap_id, s in self.sent.items()} tmp = self.sent_cache_path.with_suffix('.tmp') tmp.write_text(json.dumps(data)) tmp.rename(self.sent_cache_path) @@ -163,26 +179,11 @@ def verify_user_sent_funds(self, swap: Swap, miner_from_address: str) -> bool: expected_recipient=miner_from_address, expected_amount=swap.from_amount, block_hint=swap.from_tx_block, + expected_sender=swap.user_from_address, + require_confirmed=True, ) - if tx_info is None: - bt.logging.debug(f'Swap {swap.id}: source tx not found or unconfirmed') - return False - - if not tx_info.confirmed: - bt.logging.debug(f'Swap {swap.id}: source tx not yet confirmed') - return False - - # Miner self-protection: don't send dest funds unless the source tx - # actually came from the user address tied to this swap. Validators - # check this too at initiation, but the miner shouldn't trust that - # alone — an exploited or buggy validator quorum shouldn't cost the - # miner their send. - if tx_info.sender and tx_info.sender != swap.user_from_address: - bt.logging.warning( - f'Swap {swap.id}: source tx sender mismatch ' - f'(expected {swap.user_from_address}, got {tx_info.sender}) — refusing to fulfill' - ) + bt.logging.debug(f'Swap {swap.id}: source tx not ready (not found, unconfirmed, or sender mismatch)') return False bt.logging.info(f'Swap {swap.id}: source funds verified ({tx_info.amount} from {tx_info.sender})') @@ -225,17 +226,23 @@ def send_dest_funds(self, swap: Swap, user_receives_amount: int) -> Optional[Tup return result def process_swap(self, swap: Swap) -> bool: - """Full swap processing flow: verify safety -> verify funds -> send -> mark fulfilled. - - Idempotent across forward steps. The ``_sent`` cache records both the - dest-tx outcome and whether ``mark_fulfilled`` has already succeeded, so - retry polls don't re-send dest funds and don't re-call the contract. - Cache entries live until ``cleanup_stale_sends`` clears them when the - swap leaves the active set. + """Run the full swap lifecycle for one assigned swap. + + Idempotent across forward steps — the ``sent`` cache tracks both the + dest-tx outcome and whether ``mark_fulfilled`` has landed, so retry + polls never double-send and never double-call the contract. Cache + entries live until ``cleanup_stale_sends`` drops them once the swap + leaves the active set. + + Three possible starting states when this runs: + - no prior record → send dest funds, then mark fulfilled + - prior send, not yet marked → skip send, retry mark fulfilled + - prior send, already marked → nothing to do """ - state = self.sent.get(swap.id) - if state is not None and state[2]: - # mark_fulfilled already succeeded; contract state will catch up. + sent = self.sent.get(swap.id) + if sent and sent.marked_fulfilled: + # Already finished on a previous pass. Contract state catches up + # when validators confirm — nothing to retry here. return True bt.logging.info(f'Processing swap {swap.id}: {swap.from_chain} -> {swap.to_chain}') @@ -253,34 +260,33 @@ def process_swap(self, swap: Swap) -> bool: bt.logging.debug(f'Swap {swap.id}: waiting for source funds confirmation') return False - # Step 3: Send destination funds. The ``sent`` cache serves two - # purposes: skip the send on a retry (dest tx already broadcast) and - # skip the mark_fulfilled call on a retry (contract already accepted). - if state is not None: - to_tx_hash, to_tx_block, _ = state - bt.logging.info(f'Swap {swap.id}: retrying mark_fulfilled for cached send tx {to_tx_hash[:16]}...') - else: + # Step 3: Send destination funds — unless we already did on a previous + # pass, in which case we skip straight to the mark_fulfilled retry. + if sent is None: send_result = self.send_dest_funds(swap, user_receives_amount) if not send_result: bt.logging.error(f'Swap {swap.id}: failed to send dest funds') return False to_tx_hash, to_tx_block = send_result - self.sent[swap.id] = (to_tx_hash, to_tx_block, False) + sent = SentSwap(to_tx_hash=to_tx_hash, to_tx_block=to_tx_block, marked_fulfilled=False) + self.sent[swap.id] = sent self.save_sent_cache() + else: + bt.logging.info(f'Swap {swap.id}: retrying mark_fulfilled for cached send tx {sent.to_tx_hash[:16]}...') - # Step 4: Mark fulfilled on contract. We pass user_receives_amount as - # to_amount because at mark_fulfilled time the contract expects the - # actual sent amount (post-fee), which is what `swap.to_amount` will - # be set to after the call. + # Step 4: Mark fulfilled on contract. We pass ``user_receives_amount`` + # as ``to_amount`` because at mark_fulfilled time the contract stores + # the actual sent amount (post-fee), which is what ``swap.to_amount`` + # becomes after the call. try: self.client.mark_fulfilled( wallet=self.wallet, swap_id=swap.id, - to_tx_hash=to_tx_hash, + to_tx_hash=sent.to_tx_hash, to_amount=user_receives_amount, - to_tx_block=to_tx_block, + to_tx_block=sent.to_tx_block, ) - self.sent[swap.id] = (to_tx_hash, to_tx_block, True) + sent.marked_fulfilled = True self.save_sent_cache() bt.logging.success(f'Swap {swap.id}: marked as fulfilled') return True diff --git a/allways/utils/rate.py b/allways/utils/rate.py index a5e9424..b81ee33 100644 --- a/allways/utils/rate.py +++ b/allways/utils/rate.py @@ -4,6 +4,7 @@ from typing import Tuple from allways.chains import canonical_pair, get_chain +from allways.classes import Swap from allways.constants import RATE_PRECISION @@ -50,7 +51,7 @@ def calculate_to_amount( return from_amount * rate_fixed // (RATE_PRECISION * 10 ** (-decimal_diff)) -def expected_swap_amounts(swap, fee_divisor: int) -> Tuple[int, int]: +def expected_swap_amounts(swap: Swap, fee_divisor: int) -> Tuple[int, int]: """Compute expected to_amount and fee-adjusted user_receives from a swap's on-chain fields. Single source of truth used by both miner (fulfillment) and validator (verification). diff --git a/allways/validator/axon_handlers.py b/allways/validator/axon_handlers.py index a00204f..e2fa5ab 100644 --- a/allways/validator/axon_handlers.py +++ b/allways/validator/axon_handlers.py @@ -9,18 +9,22 @@ to inject the validator context. """ -from typing import Tuple +from typing import TYPE_CHECKING, Optional, Tuple import bittensor as bt from Crypto.Hash import keccak from substrateinterface import Keypair +from allways.classes import MinerPair from allways.commitments import read_miner_commitment from allways.constants import RESERVATION_COOLDOWN_BLOCKS from allways.contract_client import AllwaysContractClient, ContractError, compact_encode_len, is_contract_rejection from allways.synapses import MinerActivateSynapse, SwapConfirmSynapse, SwapReserveSynapse from allways.validator.state_store import PendingConfirm +if TYPE_CHECKING: + from neurons.validator import Validator + def keccak256(data: bytes) -> bytes: """Compute Keccak-256 hash (matches ink::env::hash::Keccak256).""" @@ -117,7 +121,11 @@ def encode_str(s: str) -> bytes: ) -def resolve_swap_direction(commitment, synapse_from_chain: str, synapse_to_chain: str): +def resolve_swap_direction( + commitment: MinerPair, + synapse_from_chain: str, + synapse_to_chain: str, +) -> Optional[Tuple[str, str, str, str, float, str]]: """Resolve deposit/fulfillment addresses and rate from commitment and requested direction. Returns (from_chain, to_chain, deposit_addr, fulfillment_addr, rate, rate_str) or None. @@ -133,7 +141,7 @@ def resolve_swap_direction(commitment, synapse_from_chain: str, synapse_to_chain return from_chain, to_chain, deposit_addr, fulfillment_addr, rate, rate_str -def load_swap_commitment(validator, miner_hotkey: str): +def load_swap_commitment(validator: 'Validator', miner_hotkey: str) -> Optional[MinerPair]: """Read miner commitment and validate chains differ. Returns commitment or None.""" commitment = read_miner_commitment( subtensor=validator.axon_subtensor, @@ -145,7 +153,7 @@ def load_swap_commitment(validator, miner_hotkey: str): return commitment -def reject_synapse(synapse, reason: str, context: str = '') -> None: +def reject_synapse(synapse: bt.Synapse, reason: str, context: str = '') -> None: """Mark a synapse as rejected with a reason and debug log.""" synapse.accepted = False synapse.rejection_reason = reason @@ -159,7 +167,7 @@ def reject_synapse(synapse, reason: str, context: str = '') -> None: async def blacklist_miner_activate( - validator, + validator: 'Validator', synapse: MinerActivateSynapse, ) -> Tuple[bool, str]: """Reject synapses from unregistered hotkeys.""" @@ -172,7 +180,7 @@ async def blacklist_miner_activate( async def priority_miner_activate( - validator, + validator: 'Validator', synapse: MinerActivateSynapse, ) -> float: """Priority by stake — higher stake processed first.""" @@ -184,7 +192,7 @@ async def priority_miner_activate( async def handle_miner_activate( - validator, + validator: 'Validator', synapse: MinerActivateSynapse, ) -> MinerActivateSynapse: """Process miner activation: verify commitment + vote on contract.""" @@ -240,7 +248,7 @@ async def handle_miner_activate( async def blacklist_swap_reserve( - validator, + validator: 'Validator', synapse: SwapReserveSynapse, ) -> Tuple[bool, str]: """Pass-through — custom field checks happen in forward handler. @@ -253,7 +261,7 @@ async def blacklist_swap_reserve( async def priority_swap_reserve( - validator, + validator: 'Validator', synapse: SwapReserveSynapse, ) -> float: """Flat priority for user requests.""" @@ -261,7 +269,7 @@ async def priority_swap_reserve( async def handle_swap_reserve( - validator, + validator: 'Validator', synapse: SwapReserveSynapse, ) -> SwapReserveSynapse: """Validate swap reservation request and vote on contract.""" @@ -402,7 +410,7 @@ async def handle_swap_reserve( async def blacklist_swap_confirm( - validator, + validator: 'Validator', synapse: SwapConfirmSynapse, ) -> Tuple[bool, str]: """Pass-through — custom field checks happen in forward handler. @@ -413,7 +421,7 @@ async def blacklist_swap_confirm( async def priority_swap_confirm( - validator, + validator: 'Validator', synapse: SwapConfirmSynapse, ) -> float: """Flat priority for user requests.""" @@ -421,7 +429,7 @@ async def priority_swap_confirm( async def handle_swap_confirm( - validator, + validator: 'Validator', synapse: SwapConfirmSynapse, ) -> SwapConfirmSynapse: """Verify source transaction and vote to initiate swap.""" @@ -472,25 +480,18 @@ async def handle_swap_confirm( reject_synapse(synapse, f'Unsupported chain: {swap_from_chain}', ctx) return synapse + # Defend against user-snipes-miner by passing expected_sender: a user + # could otherwise reserve a miner and claim any third-party tx of the + # right amount to the miner's address. The base provider wraps this + # check; the specific rejection reason is logged there at warning level. tx_info = provider.verify_transaction( tx_hash=synapse.from_tx_hash, expected_recipient=miner_from_address, expected_amount=res_source_amount, + expected_sender=synapse.from_address, ) if tx_info is None: - reject_synapse(synapse, 'Source transaction not found or amount mismatch', ctx) - return synapse - - # Defend against user-snipes-miner: reject if the source tx wasn't - # actually sent by the address the user proved ownership of at reserve - # time. Without this, a user could reserve a miner and then submit any - # unrelated third-party tx of the right amount to the miner's address. - if tx_info.sender and tx_info.sender != synapse.from_address: - reject_synapse( - synapse, - f'Source tx sender mismatch (expected {synapse.from_address}, got {tx_info.sender})', - ctx, - ) + reject_synapse(synapse, 'Source transaction not found, amount or sender mismatch', ctx) return synapse if not tx_info.confirmed: diff --git a/allways/validator/chain_verification.py b/allways/validator/chain_verification.py index 6644720..182d659 100644 --- a/allways/validator/chain_verification.py +++ b/allways/validator/chain_verification.py @@ -5,7 +5,7 @@ import bittensor as bt -from allways.chain_providers.base import ChainProvider, ProviderUnreachableError +from allways.chain_providers.base import ChainProvider, ProviderUnreachableError, TransactionInfo from allways.classes import Swap from allways.utils.rate import expected_swap_amounts @@ -42,7 +42,13 @@ def verify_tx( block_hint: int = 0, expected_sender: str = '', ) -> bool: - """Verify a confirmed transaction on a specific chain.""" + """Verify a confirmed transaction on a specific chain. + + Defers tx lookup, amount, and sender checks to the provider's + ``verify_transaction`` so the defense lives in one place shared with + the miner and axon flows. Keeps the rate-limited confirmations debug + log here because it's specific to the validator polling loop. + """ provider = self.providers.get(chain) if not provider: bt.logging.warning(f'Swap {swap.id}: no provider for chain {chain}') @@ -58,28 +64,16 @@ def verify_tx( expected_recipient=expected_recipient, expected_amount=expected_amount, block_hint=block_hint, + expected_sender=expected_sender or None, ) if tx_info is None: bt.logging.debug( f'Swap {swap.id}: verify_transaction returned None on {chain} ' f'(tx={tx_hash[:16]}... block_hint={block_hint})' ) - elif not tx_info.confirmed: - log_key = f'{swap.id}:{chain}' - prev_confs = self.last_logged_confs.get(log_key) - if prev_confs != tx_info.confirmations: - self.last_logged_confs[log_key] = tx_info.confirmations - bt.logging.debug( - f'Swap {swap.id}: tx found but not confirmed on {chain} ' - f'(confs={tx_info.confirmations} tx={tx_hash[:16]}... ' - f'addr={expected_recipient[:16]}... expected={expected_amount})' - ) - if tx_info is None or not tx_info.confirmed: return False - if expected_sender and tx_info.sender != expected_sender: - bt.logging.warning( - f'Swap {swap.id}: sender mismatch on {chain} — expected {expected_sender}, got {tx_info.sender}' - ) + if not tx_info.confirmed: + self.log_confs_progress(swap.id, chain, tx_hash, tx_info, expected_recipient, expected_amount) return False return True except ProviderUnreachableError: @@ -88,6 +82,26 @@ def verify_tx( bt.logging.error(f'Swap {swap.id}: verification error on {chain}: {e}') return False + def log_confs_progress( + self, + swap_id: int, + chain: str, + tx_hash: str, + tx_info: TransactionInfo, + expected_recipient: str, + expected_amount: int, + ) -> None: + """Rate-limited debug log for confirmations progress on unconfirmed txs.""" + log_key = f'{swap_id}:{chain}' + if self.last_logged_confs.get(log_key) == tx_info.confirmations: + return + self.last_logged_confs[log_key] = tx_info.confirmations + bt.logging.debug( + f'Swap {swap_id}: tx found but not confirmed on {chain} ' + f'(confs={tx_info.confirmations} tx={tx_hash[:16]}... ' + f'addr={expected_recipient[:16]}... expected={expected_amount})' + ) + async def verify_miner_fulfillment(self, swap: Swap) -> bool: """Verify rate, to_amount, user send, and miner fulfillment. diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index 54e83f4..48d91cc 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -1,35 +1,12 @@ """ContractEventWatcher — event-sourced miner state for the validator. -The scoring path used to poll per-miner collateral / active flag / min_collateral -from the contract on a cadence. That was N RPC calls per poll, the active flag -was never actually checked, and two validators polling at different forward -steps could derive slightly different state for the same block range. - -This watcher sources the same state from ``Contracts::ContractEmitted`` events -on the Substrate chain. Each forward step calls ``sync_to(current_block)``; the -watcher replays events from its internal cursor up to ``current_block``, -decoding them against ``allways_swap_manager.json`` and applying them to -in-memory dicts: - -- ``collateral[hotkey]`` — current collateral in rao (from CollateralPosted, - CollateralWithdrawn, CollateralSlashed) -- ``active_miners: Set[hotkey]`` — miners with ``miner_active == True`` - (from MinerActivated events) -- ``min_collateral`` — current minimum collateral threshold (from - ConfigUpdated{key="min_collateral"} events) -- ``collateral_events`` — ordered history used by the crown-time scoring - replay, bounded to ``2 * SCORING_WINDOW_BLOCKS`` -- swap outcomes are forwarded into ``ValidatorStateStore.insert_swap_outcome`` - so the credibility ledger survives restarts - -Cold start: every run backfills from ``max(0, head - 2 * SCORING_WINDOW_BLOCKS)``. -No cursor file. The scoring window only needs one window of history; the -swap_outcomes ledger is the single piece of state that must persist across -restarts and it already lives in ``state.db``. - -Decoder: ported from ``alw-utils/.../watch_contract_events.py`` which has been -in production on the dashboard side. Falls back to a hardcoded topic→event -registry if the metadata JSON can't be loaded. +Each forward step calls ``sync_to(current_block)``; the watcher replays +``Contracts::ContractEmitted`` events from its cursor up to ``current_block`` +and applies them to in-memory state used by the crown-time scoring replay. +Cold start backfills one scoring window so the first scoring pass after a +restart already has a populated history. Swap outcomes are forwarded into +``ValidatorStateStore.insert_swap_outcome`` so the credibility ledger +survives restarts. """ from __future__ import annotations @@ -239,32 +216,21 @@ class CollateralEvent: block: int +@dataclass +class BusyEvent: + """``delta`` is +1 on SwapInitiated and -1 on SwapCompleted/SwapTimedOut. + A miner is busy (excluded from crown) whenever the running sum is > 0.""" + + hotkey: str + delta: int + block: int + + MAX_BLOCKS_PER_SYNC = 50 class ContractEventWatcher: - """Replays contract events into in-memory miner state. - - Usage: - watcher = ContractEventWatcher(substrate, contract_address, metadata_path, state_store) - watcher.initialize(current_block, metagraph_hotkeys, contract_client) - ... every forward step ... - watcher.sync_to(current_block) - - Scoring reads ``get_collateral_events_in_range``, ``get_latest_collateral_before``, - ``active_miners``, ``min_collateral`` directly off the watcher. Swap outcomes - are forwarded into ``state_store.insert_swap_outcome`` so the credibility - ledger persists across restarts. - - ``initialize`` snapshots current on-chain state for every metagraph miner, - then advances the cursor to ``current_block``. From that point forward only - events drive state changes. This avoids the trap where a miner who posted - collateral before the replay window would look like they had zero. - - ``sync_to`` is bounded to ``MAX_BLOCKS_PER_SYNC`` per call so a long outage - doesn't block the forward loop — the cursor catches up over multiple - forward steps at ~50 blocks per tick. - """ + """Replays contract events into in-memory miner state.""" def __init__( self, @@ -283,25 +249,19 @@ def __init__( self.collateral: Dict[str, int] = {} self.active_miners: Set[str] = set() self.min_collateral: int = default_min_collateral - # Sorted-by-block history used by crown-time replay. Bounded to - # 2x the scoring window; older entries are dropped on sync. self.collateral_events: List[CollateralEvent] = [] - # Per-hotkey view of collateral_events for O(log n) latest-before - # lookups during scoring replay. Kept in sync with the flat list. + # Per-hotkey view of collateral_events for O(log n) latest-before lookups. self.collateral_events_by_hotkey: Dict[str, List[CollateralEvent]] = {} + self.open_swap_count: Dict[str, int] = {} + self.busy_events: List[BusyEvent] = [] # ─── Public API consumed by scoring ───────────────────────────────── def get_latest_collateral_before(self, hotkey: str, block: int) -> Optional[Tuple[int, int]]: - """Most recent collateral event for ``hotkey`` at or before ``block``. - - O(log n) via binary search on the per-hotkey event list. If no events - exist for the hotkey at all (bootstrap-only miners), returns the - static snapshot at block 0 — that's the authoritative pre-event - state. If events exist but none fall at/before ``block``, returns - None: the snapshot reflects state AFTER the existing events and is - not valid for queries in the pre-event gap. - """ + """Most recent collateral for ``hotkey`` at or before ``block``. If + events exist but none fall at/before ``block``, returns None — the + bootstrap snapshot reflects post-event state and is invalid for the + pre-event gap.""" from bisect import bisect_right events = self.collateral_events_by_hotkey.get(hotkey) @@ -324,6 +284,26 @@ def get_collateral_events_in_range(self, start_block: int, end_block: int) -> Li out.append({'hotkey': ev.hotkey, 'collateral_rao': ev.collateral_rao, 'block': ev.block}) return out + def get_busy_events_in_range(self, start_block: int, end_block: int) -> List[dict]: + out: List[dict] = [] + for ev in self.busy_events: + if ev.block <= start_block: + continue + if ev.block > end_block: + break + out.append({'hotkey': ev.hotkey, 'delta': ev.delta, 'block': ev.block}) + return out + + def get_busy_miners_at(self, block: int) -> Dict[str, int]: + """Per-hotkey open-swap count at ``block``, reconstructed by replaying + every delta at or before ``block``.""" + counts: Dict[str, int] = {} + for ev in self.busy_events: + if ev.block > block: + break + counts[ev.hotkey] = counts.get(ev.hotkey, 0) + ev.delta + return {hk: c for hk, c in counts.items() if c > 0} + # ─── Sync loop ────────────────────────────────────────────────────── def initialize( @@ -333,13 +313,8 @@ def initialize( contract_client: Any = None, ) -> None: """Cold start: snapshot contract state for every metagraph miner, then - advance the cursor so only forward events drive state changes. - - Callers should pass ``metagraph_hotkeys`` and a read-capable - ``contract_client``. If either is missing (e.g. in unit tests) the - watcher falls back to an empty snapshot and starts at ``current_block`` - — scoring will simply not credit any miner until events arrive. - """ + rewind the cursor by one scoring window so ``sync_to`` backfills it + before the first scoring pass runs.""" if metagraph_hotkeys and contract_client is not None: for hotkey in metagraph_hotkeys: try: @@ -360,20 +335,33 @@ def initialize( self.min_collateral = raw_min except Exception as e: bt.logging.debug(f'EventWatcher bootstrap: min_collateral read failed: {e}') + # Without this seed, a miner already serving a swap at startup + # would be treated as idle until the next terminal event. + try: + in_flight = contract_client.get_active_swaps() or [] + seen_hotkeys = set() + for swap in in_flight: + hk = getattr(swap, 'miner_hotkey', '') + init_block = getattr(swap, 'initiated_block', current_block) + if not hk: + continue + seen_hotkeys.add(hk) + self.open_swap_count[hk] = self.open_swap_count.get(hk, 0) + 1 + self.busy_events.append(BusyEvent(hotkey=hk, delta=+1, block=init_block)) + if seen_hotkeys: + self.busy_events.sort(key=lambda ev: ev.block) + bt.logging.info(f'EventWatcher bootstrap: seeded {len(seen_hotkeys)} miners as busy from contract') + except Exception as e: + bt.logging.debug(f'EventWatcher bootstrap: active swaps read failed: {e}') bt.logging.info( f'EventWatcher initialized: {len(self.collateral)} collateral entries, ' f'{len(self.active_miners)} active miners, min_collateral={self.min_collateral}' ) - self.cursor = current_block + self.cursor = max(0, current_block - SCORING_WINDOW_BLOCKS) def sync_to(self, current_block: int) -> None: - """Catch up from cursor to ``current_block`` in bounded chunks. - - At most ``MAX_BLOCKS_PER_SYNC`` blocks are processed per call so a - multi-minute outage doesn't freeze the forward loop on one sync. The - cursor advances incrementally across forward steps until it catches - up to head. - """ + """Catch up from cursor to ``current_block`` in MAX_BLOCKS_PER_SYNC + chunks so a long outage doesn't freeze the forward loop.""" if current_block <= self.cursor: return end = min(current_block, self.cursor + MAX_BLOCKS_PER_SYNC) @@ -441,9 +429,8 @@ def decode_contract_event(self, event_record: Any) -> Optional[Tuple[str, Dict[s def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None: if name == 'CollateralPosted': - # `total` is the authoritative post-event balance — use it as a - # SET so we don't drift when the replay window is missing prior - # events. Fall back to an add if `total` isn't present. + # Prefer ``total`` (authoritative post-event balance) so we don't + # drift when the replay window misses prior events. hotkey = values.get('miner', '') total = values.get('total') if total is not None: @@ -458,9 +445,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None else: self.adjust_collateral(block_num, hotkey, -int(values.get('amount', 0))) elif name == 'CollateralSlashed': - # Slashed has no `total` / `remaining` field — it only carries the - # slash amount. Subtract from the current snapshot (which was - # seeded at initialize() or updated by a prior Posted/Withdrawn). + # Slashed only carries the slash amount, no post-event balance. self.adjust_collateral(block_num, values.get('miner', ''), -int(values.get('amount', 0))) elif name == 'MinerActivated': hotkey = values.get('miner', '') @@ -476,6 +461,10 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None self.min_collateral = int(values.get('value', 0)) except (TypeError, ValueError): pass + elif name == 'SwapInitiated': + miner = values.get('miner', '') + if miner: + self.apply_busy_delta(block_num, miner, +1) elif name == 'SwapCompleted': swap_id = values.get('swap_id') miner = values.get('miner', '') @@ -486,6 +475,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None completed=True, resolved_block=block_num, ) + self.apply_busy_delta(block_num, miner, -1) elif name == 'SwapTimedOut': swap_id = values.get('swap_id') miner = values.get('miner', '') @@ -496,9 +486,21 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None completed=False, resolved_block=block_num, ) + self.apply_busy_delta(block_num, miner, -1) + + def apply_busy_delta(self, block_num: int, hotkey: str, delta: int) -> None: + """Apply a ±1 transition. Drops any -1 with no matching prior +1 + rather than letting the open-swap count go negative.""" + if delta == 0: + return + current = self.open_swap_count.get(hotkey, 0) + new_count = current + delta + if new_count < 0: + return + self.open_swap_count[hotkey] = new_count + self.busy_events.append(BusyEvent(hotkey=hotkey, delta=delta, block=block_num)) def set_collateral(self, block_num: int, hotkey: str, new_total: int) -> None: - """Record an authoritative post-event collateral balance for ``hotkey``.""" if not hotkey: return new_total = max(0, new_total) @@ -508,20 +510,33 @@ def set_collateral(self, block_num: int, hotkey: str, new_total: int) -> None: self.collateral_events_by_hotkey.setdefault(hotkey, []).append(event) def adjust_collateral(self, block_num: int, hotkey: str, delta: int) -> None: - """Add a delta to the current collateral snapshot and emit an event row.""" if not hotkey: return new_total = max(0, self.collateral.get(hotkey, 0) + delta) self.set_collateral(block_num, hotkey, new_total) def prune_old_collateral_events(self, current_block: int) -> None: - cutoff = current_block - 2 * SCORING_WINDOW_BLOCKS - if cutoff <= 0 or not self.collateral_events: + """Drop collateral and busy events older than one scoring window. The + latest collateral row per hotkey is preserved as a state-reconstruction + anchor; busy events are kept while the open-swap count is still > 0 + so the matching -1 isn't orphaned.""" + cutoff = current_block - SCORING_WINDOW_BLOCKS + if cutoff <= 0: return - self.collateral_events = [ev for ev in self.collateral_events if ev.block >= cutoff] - for hotkey, events in list(self.collateral_events_by_hotkey.items()): - pruned = [ev for ev in events if ev.block >= cutoff] - if pruned: - self.collateral_events_by_hotkey[hotkey] = pruned - else: - del self.collateral_events_by_hotkey[hotkey] + if self.collateral_events: + latest_per_hotkey = {} + for ev in self.collateral_events: + latest_per_hotkey[ev.hotkey] = ev # last write wins (events are append-order) + self.collateral_events = [ + ev for ev in self.collateral_events if ev.block >= cutoff or latest_per_hotkey.get(ev.hotkey) is ev + ] + for hotkey, events in list(self.collateral_events_by_hotkey.items()): + latest = events[-1] if events else None + pruned = [ev for ev in events if ev.block >= cutoff or ev is latest] + if pruned: + self.collateral_events_by_hotkey[hotkey] = pruned + else: + del self.collateral_events_by_hotkey[hotkey] + if self.busy_events: + open_now = {hk for hk, c in self.open_swap_count.items() if c > 0} + self.busy_events = [ev for ev in self.busy_events if ev.block >= cutoff or ev.hotkey in open_now] diff --git a/allways/validator/forward.py b/allways/validator/forward.py index 3eccca3..ae63898 100644 --- a/allways/validator/forward.py +++ b/allways/validator/forward.py @@ -1,26 +1,16 @@ -"""Validator forward pass - scoring entry point.""" +"""Validator forward pass — orchestrator called every step by the base neuron.""" from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Set import bittensor as bt -import numpy as np from allways.chain_providers.base import ProviderUnreachableError from allways.classes import SwapStatus from allways.commitments import read_miner_commitments -from allways.constants import ( - COMMITMENT_POLL_INTERVAL_BLOCKS, - DIRECTION_POOLS, - EVENT_RETENTION_BLOCKS, - EXTEND_THRESHOLD_BLOCKS, - RECYCLE_UID, - SCORING_INTERVAL_STEPS, - SCORING_WINDOW_BLOCKS, - SUCCESS_EXPONENT, -) +from allways.constants import EXTEND_THRESHOLD_BLOCKS, SCORING_WINDOW_BLOCKS from allways.contract_client import ContractError, is_contract_rejection from allways.utils.logging import log_on_change from allways.validator import voting @@ -30,7 +20,8 @@ scale_encode_initiate_hash_input, ) from allways.validator.chain_verification import SwapVerifier -from allways.validator.state_store import ValidatorStateStore +from allways.validator.scoring import score_and_reward_miners +from allways.validator.state_store import PendingConfirm from allways.validator.swap_tracker import SwapTracker if TYPE_CHECKING: @@ -38,166 +29,57 @@ async def forward(self: Validator) -> None: - """Main validator forward pass. - - Called by BaseValidatorNeuron.concurrent_forward() each step. - - Flow: - 1. Process pending confirmations (queued by axon handler, awaiting tx confirmations) - 2. Commitment poll (rates) - 3. Event watcher sync (collateral, active flag, min_collateral, swap outcomes) - 4. Poll tracker for new/updated swaps (incremental) - 5. For FULFILLED swaps, verify both sides -> confirm_swap - 6. For FULFILLED swaps near timeout with unconfirmed dest tx -> extend timeout - 7. For ACTIVE/FULFILLED past timeout -> timeout_swap (single trigger) - 8. Every SCORING_INTERVAL_STEPS, score from in-memory window - """ + """One validator forward step. Phase order matters — each phase may depend + on state mutated by the previous one.""" bt.logging.info(f'Forward step {self.step}') tracker: SwapTracker = self.swap_tracker verifier: SwapVerifier = self.swap_verifier clear_provider_caches(self) - self.state_store.purge_expired_pending() + self.state_store.purge_expired_pending_confirms() + initialize_pending_user_reservations(self) + poll_commitments(self) + try: self.event_watcher.sync_to(self.block) except Exception as e: bt.logging.warning(f'Event watcher sync failed: {e}') + + # Pull newly-initiated and resolved swaps off the contract. await tracker.poll(self.block) - uncertain = await confirm_miner_fulfillments(self, tracker, verifier, self.block) + + # Verify FULFILLED swaps end-to-end and vote confirm_swap. The returned + # set is swap IDs where the provider was unreachable this cycle, so the + # timeout phase knows to skip them (transient outage shouldn't slash). + uncertain_swaps = await confirm_miner_fulfillments(self, tracker, verifier, self.block) + extend_fulfilled_near_timeout(self) - enforce_swap_timeouts(self, tracker, uncertain) + enforce_swap_timeouts(self, tracker, uncertain_swaps) - if self.step % SCORING_INTERVAL_STEPS == 0: - run_scoring_pass(self) + if self.step % SCORING_WINDOW_BLOCKS == 0: + score_and_reward_miners(self) def clear_provider_caches(self: Validator) -> None: - """Clear per-poll caches on chain providers.""" for provider in self.chain_providers.values(): if hasattr(provider, 'clear_cache'): provider.clear_cache() -def poll_commitments(self: Validator) -> None: - """Rate-side validator tick. - - Three independent steps run at ``COMMITMENT_POLL_INTERVAL_BLOCKS`` cadence: - - 1. ``prune_aged_rate_events`` — trim history older than the retention - window so the SQLite tables stay bounded. - 2. ``refresh_miner_rates`` — read all miner commitments from the local - subtensor and persist direction-level diffs. - 3. ``purge_deregistered_hotkeys`` — drop any hotkeys that have left the - metagraph since the last poll, both from the store and the in-memory - cache. - - Kept as a thin orchestrator so each concern can be tested and reasoned - about independently. - """ - if self.block - self.last_commitment_poll_block < COMMITMENT_POLL_INTERVAL_BLOCKS: - return - self.last_commitment_poll_block = self.block - - prune_aged_rate_events(self) - refresh_miner_rates(self) - purge_deregistered_hotkeys(self) - - -def prune_aged_rate_events(self: Validator) -> None: - """Delete rate/collateral events older than ``EVENT_RETENTION_BLOCKS``. - - Retention is deliberately 2× the scoring window so ``get_latest_*_before`` - calls at the window start can always find prior state to reconstruct from. - """ - cutoff = self.block - EVENT_RETENTION_BLOCKS - if cutoff > 0: - self.state_store.prune_events_older_than(cutoff) - - -def refresh_miner_rates(self: Validator) -> None: - """Pull all miner commitments and persist direction-level rate diffs. - - Rate events that match the cached ``_last_known_rates`` value are skipped - entirely. Rate events accepted by the store update the cache; throttled or - deduped inserts still update the cache so we don't repeatedly retry the - same blocked write on every subsequent poll. - """ - try: - pairs = read_miner_commitments(self.subtensor, self.config.netuid) - except Exception as e: - bt.logging.warning(f'Commitment poll failed: {e}') - return - - current_hotkeys = set(self.metagraph.hotkeys) - - for pair in pairs: - if pair.hotkey not in current_hotkeys: - continue - for from_c, to_c, r in ( - (pair.from_chain, pair.to_chain, pair.rate), - (pair.to_chain, pair.from_chain, pair.counter_rate), - ): - if r <= 0: - continue # miner opted out of this direction - key = (pair.hotkey, from_c, to_c) - if self.last_known_rates.get(key) == r: - continue - self.state_store.insert_rate_event( - hotkey=pair.hotkey, - from_chain=from_c, - to_chain=to_c, - rate=r, - block=self.block, - ) - self.last_known_rates[key] = r - - -def purge_deregistered_hotkeys(self: Validator) -> None: - """Drop rates/collateral/outcomes for hotkeys that left the metagraph.""" - current_hotkeys = set(self.metagraph.hotkeys) - stale = {hk for (hk, _, _) in self.last_known_rates.keys()} - current_hotkeys - if not stale: - return - for hk in stale: - self.state_store.delete_hotkey(hk) - self.last_known_rates = {k: v for k, v in self.last_known_rates.items() if k[0] not in stale} - - -def try_extend_reservation(self: Validator, item, current_block: int, swap_label: str, miner_short: str) -> None: - """Vote to extend reservation if nearing expiry, protecting users during provider outages.""" - from substrateinterface import Keypair - - try: - reserved_until = self.contract_client.get_miner_reserved_until(item.miner_hotkey) - blocks_left = reserved_until - current_block - if reserved_until < current_block + EXTEND_THRESHOLD_BLOCKS: - miner_bytes = bytes.fromhex(Keypair(ss58_address=item.miner_hotkey).public_key.hex()) - extend_hash = keccak256(scale_encode_extend_hash_input(miner_bytes, item.from_tx_hash)) - self.contract_client.vote_extend_reservation( - wallet=self.wallet, - request_hash=extend_hash, - miner_hotkey=item.miner_hotkey, - from_tx_hash=item.from_tx_hash, - ) - bt.logging.info( - f'PendingConfirm [{swap_label} {miner_short}]: ' - f'voted to extend reservation ({blocks_left} blocks remaining)' - ) - except ContractError as e: - if 'AlreadyVoted' not in str(e): - bt.logging.debug(f'PendingConfirm [{swap_label} {miner_short}]: extend vote: {e}') - except Exception as e: - bt.logging.debug(f'PendingConfirm [{swap_label} {miner_short}]: extend check failed: {e}') - - def initialize_pending_user_reservations(self: Validator) -> None: """Check queued unconfirmed txs and vote_initiate when confirmations are met.""" from substrateinterface import Keypair items = self.state_store.get_all() + # Drop extend-reservation vote receipts whose pending_confirm has been + # removed (vote_initiate landed, tx not found, expired, etc.). + live_keys = {(item.miner_hotkey, item.from_tx_hash) for item in items} + for stale_key in [k for k in self.extend_reservation_voted_at if k not in live_keys]: + del self.extend_reservation_voted_at[stale_key] + if not items: return @@ -213,7 +95,6 @@ def initialize_pending_user_reservations(self: Validator) -> None: chain_def = self.chain_providers.get(item.from_chain) min_confs = chain_def.get_chain().min_confirmations if chain_def else '?' - # Skip if swap already initiated (another validator reached quorum) try: if self.contract_client.get_miner_has_active_swap(item.miner_hotkey): self.state_store.remove(item.miner_hotkey) @@ -222,7 +103,6 @@ def initialize_pending_user_reservations(self: Validator) -> None: except Exception as e: bt.logging.warning(f'PendingConfirm [{swap_label} {miner_short}]: active swap check failed: {e}') - # Re-verify tx with main-loop chain provider provider = self.chain_providers.get(item.from_chain) if provider is None: self.state_store.remove(item.miner_hotkey) @@ -252,14 +132,6 @@ def initialize_pending_user_reservations(self: Validator) -> None: ) continue - if tx_info.sender and tx_info.sender != item.from_address: - self.state_store.remove(item.miner_hotkey) - bt.logging.warning( - f'PendingConfirm [{swap_label} {miner_short}]: sender mismatch ' - f'(expected {item.from_address}, got {tx_info.sender}), dropping' - ) - continue - log_on_change( f'confs:{item.miner_hotkey}', tx_info.confirmations, @@ -267,15 +139,13 @@ def initialize_pending_user_reservations(self: Validator) -> None: f'{tx_info.confirmations}/{min_confs} confirmations, tx={item.from_tx_hash[:16]}...', ) - try_extend_reservation(self, item, current_block, swap_label, miner_short) - if not tx_info.confirmed: + try_extend_reservation(self, item, current_block, swap_label, miner_short) continue - # Confirmed — compute hash and vote. Only drop the queued entry once the - # vote is accepted (or the contract tells us someone else already - # initiated it). On transient RPC/network failure we leave the entry in - # place so the next forward step retries instead of silently losing it. + # Only drop the queued entry once the vote is accepted (or the contract + # rejects it as already-initiated). Transient RPC failures leave the + # entry queued so the next forward step retries. try: miner_bytes = bytes.fromhex(Keypair(ss58_address=item.miner_hotkey).public_key.hex()) hash_input = scale_encode_initiate_hash_input( @@ -318,9 +188,6 @@ def initialize_pending_user_reservations(self: Validator) -> None: ) except ContractError as e: if is_contract_rejection(e): - # Contract rejected — in practice this means another validator - # already reached initiate quorum, so the entry is no longer - # actionable. Drop it. self.state_store.remove(item.miner_hotkey) bt.logging.info( f'PendingConfirm [{swap_label} {miner_short}]: contract rejected (likely already initiated)' @@ -331,13 +198,110 @@ def initialize_pending_user_reservations(self: Validator) -> None: bt.logging.error(f'PendingConfirm [{swap_label} {miner_short}]: unexpected error: {e}') +def try_extend_reservation( + self: Validator, + item: PendingConfirm, + current_block: int, + swap_label: str, + miner_short: str, +) -> None: + """Vote to extend reservation if nearing expiry, protecting users during provider outages.""" + from substrateinterface import Keypair + + try: + reserved_until = self.contract_client.get_miner_reserved_until(item.miner_hotkey) + if reserved_until >= current_block + EXTEND_THRESHOLD_BLOCKS: + return + + vote_key = (item.miner_hotkey, item.from_tx_hash) + voted_at = self.extend_reservation_voted_at.get(vote_key) + if voted_at is not None and reserved_until <= voted_at: + return # already voted under this reservation; contract hasn't extended yet + + miner_bytes = bytes.fromhex(Keypair(ss58_address=item.miner_hotkey).public_key.hex()) + extend_hash = keccak256(scale_encode_extend_hash_input(miner_bytes, item.from_tx_hash)) + self.contract_client.vote_extend_reservation( + wallet=self.wallet, + request_hash=extend_hash, + miner_hotkey=item.miner_hotkey, + from_tx_hash=item.from_tx_hash, + ) + self.extend_reservation_voted_at[vote_key] = reserved_until + bt.logging.info( + f'PendingConfirm [{swap_label} {miner_short}]: ' + f'voted to extend reservation ({reserved_until - current_block} blocks remaining)' + ) + except ContractError as e: + if 'AlreadyVoted' in str(e): + self.extend_reservation_voted_at[(item.miner_hotkey, item.from_tx_hash)] = ( + self.contract_client.get_miner_reserved_until(item.miner_hotkey) + ) + else: + bt.logging.debug(f'PendingConfirm [{swap_label} {miner_short}]: extend vote: {e}') + except Exception as e: + bt.logging.debug(f'PendingConfirm [{swap_label} {miner_short}]: extend check failed: {e}') + + +def poll_commitments(self: Validator) -> None: + """Read every miner commitment via one query_map RPC and persist diffs. + + Cost is one round-trip regardless of miner count, so per-block sampling + gives the crown-time series ~1-block accuracy. Event retention pruning + runs in the scoring round, not here. + """ + refresh_miner_rates(self) + purge_deregistered_hotkeys(self) + + +def refresh_miner_rates(self: Validator) -> None: + try: + pairs = read_miner_commitments(self.subtensor, self.config.netuid) + except Exception as e: + bt.logging.warning(f'Commitment poll failed: {e}') + return + + current_hotkeys = set(self.metagraph.hotkeys) + + for pair in pairs: + if pair.hotkey not in current_hotkeys: + continue + for from_c, to_c, r in ( + (pair.from_chain, pair.to_chain, pair.rate), + (pair.to_chain, pair.from_chain, pair.counter_rate), + ): + if r <= 0: + continue # miner opted out of this direction + key = (pair.hotkey, from_c, to_c) + if self.last_known_rates.get(key) == r: + continue + self.state_store.insert_rate_event( + hotkey=pair.hotkey, + from_chain=from_c, + to_chain=to_c, + rate=r, + block=self.block, + ) + self.last_known_rates[key] = r + + +def purge_deregistered_hotkeys(self: Validator) -> None: + current_hotkeys = set(self.metagraph.hotkeys) + stale = {hk for (hk, _, _) in self.last_known_rates.keys()} - current_hotkeys + if not stale: + return + for hk in stale: + self.state_store.delete_hotkey(hk) + self.last_known_rates = {k: v for k, v in self.last_known_rates.items() if k[0] not in stale} + + async def confirm_miner_fulfillments( self: Validator, tracker: SwapTracker, verifier: SwapVerifier, current_block: int, ) -> Set[int]: - """Verify FULFILLED swaps; returns IDs where provider was unreachable so enforce_swap_timeouts skips them.""" + """Verify FULFILLED swaps and vote confirm. Returns swap IDs whose + provider was unreachable so the caller can skip them on timeout enforce.""" uncertain: Set[int] = set() fulfilled = [s for s in tracker.get_fulfilled(current_block) if not tracker.is_voted(s.id)] if not fulfilled: @@ -363,20 +327,18 @@ async def confirm_miner_fulfillments( def extend_fulfilled_near_timeout(self: Validator) -> None: - """Extend timeout for FULFILLED swaps where dest tx exists but isn't confirmed yet. - - Mirrors reservation extension logic: when a swap is nearing timeout but the - miner has sent the dest funds (tx visible on-chain), vote to extend the timeout - so the transaction has time to confirm. - """ + """Vote to extend timeout for FULFILLED swaps whose dest tx is visible + on-chain but not yet at min confirmations.""" tracker: SwapTracker = self.swap_tracker current_block = self.block - for swap in tracker.get_near_timeout_fulfilled(current_block, EXTEND_THRESHOLD_BLOCKS): + for swap in tracker.get_near_timeout_fulfilled(current_block): + if tracker.is_extend_timeout_voted(swap.id): + continue + swap_label = f'{swap.from_chain.upper()}->{swap.to_chain.upper()}' ctx = f'Swap #{swap.id} [{swap_label}]' - # Check if dest tx exists on-chain (even if unconfirmed) provider = self.chain_providers.get(swap.to_chain) if not provider or not swap.to_tx_hash: continue @@ -393,26 +355,27 @@ def extend_fulfilled_near_timeout(self: Validator) -> None: continue if tx_info is None: - continue # dest tx not found — don't extend, let it time out + continue # dest tx not found — let it time out - blocks_left = swap.timeout_block - current_block chain_def = provider.get_chain() log_on_change( f'dest_confs:{swap.id}', tx_info.confirmations, f'{ctx}: {tx_info.confirmations}/{chain_def.min_confirmations} dest confirmations, ' - f'{blocks_left} blocks until timeout', + f'{swap.timeout_block - current_block} blocks until timeout', ) - # Dest tx exists (confirmed or not) — vote to extend timeout try: - if voting.extend_swap_timeout(self.contract_client, self.wallet, swap.id): - bt.logging.info( - f'{ctx}: voted to extend timeout ' - f'({tx_info.confirmations}/{chain_def.min_confirmations} dest confirmations)' - ) + voting.extend_swap_timeout(self.contract_client, self.wallet, swap.id) + tracker.mark_extend_timeout_voted(swap.id) + bt.logging.info( + f'{ctx}: voted to extend timeout ' + f'({tx_info.confirmations}/{chain_def.min_confirmations} dest confirmations)' + ) except ContractError as e: - if 'AlreadyVoted' not in str(e) and not is_contract_rejection(e): + if 'AlreadyVoted' in str(e): + tracker.mark_extend_timeout_voted(swap.id) + elif not is_contract_rejection(e): bt.logging.debug(f'{ctx}: extend timeout vote: {e}') except Exception as e: bt.logging.debug(f'{ctx}: extend timeout failed: {e}') @@ -430,184 +393,3 @@ def enforce_swap_timeouts(self: Validator, tracker: SwapTracker, uncertain_swaps if voting.timeout_swap(self.contract_client, self.wallet, swap.id): tracker.resolve(swap.id, SwapStatus.TIMED_OUT, self.block) bt.logging.warning(f'Swap {swap.id}: timed out') - - -def run_scoring_pass(self: Validator) -> None: - """Run a V1 scoring pass and commit weights.""" - try: - rewards, miner_uids = calculate_miner_rewards(self) - if len(miner_uids) > 0 and len(rewards) > 0: - self.update_scores(rewards, miner_uids) - except Exception as e: - bt.logging.error(f'Scoring failed: {e}') - - -def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: - """Crown-time based reward computation. - - For each direction in ``DIRECTION_POOLS``: - 1. Replay rate events (from state_store) and collateral events (from - event_watcher) chronologically over the window - 2. At each block boundary, determine crown holders (tied best-rate miners - that are in the metagraph AND active on-chain AND have - collateral >= the event-watcher's cached ``min_collateral``) - 3. Accumulate crown_blocks per hotkey, splitting evenly on ties - 4. ``rewards[uid] += pool * (crown_blocks[hk] / total) * success_rate ** SUCCESS_EXPONENT`` - - Anything not distributed to miners recycles to ``RECYCLE_UID``. - """ - n_uids = self.metagraph.n.item() - if n_uids == 0: - return np.array([], dtype=np.float32), set() - - window_end = self.block - window_start = max(0, window_end - SCORING_WINDOW_BLOCKS) - - # Miners must be both in the metagraph (registered) AND active on the - # contract (miner_active == true). Active is sourced from MinerActivated - # events replayed by the watcher. - in_metagraph: Set[str] = set(self.metagraph.hotkeys) - eligible_hotkeys: Set[str] = in_metagraph & self.event_watcher.active_miners - hotkey_to_uid: Dict[str, int] = {self.metagraph.hotkeys[uid]: uid for uid in range(n_uids)} - - rewards = np.zeros(n_uids, dtype=np.float32) - success_stats = self.state_store.get_all_time_success_rates() - min_collateral = int(self.event_watcher.min_collateral or 0) - - for (from_chain, to_chain), pool in DIRECTION_POOLS.items(): - crown_blocks = replay_crown_time_window( - store=self.state_store, - event_watcher=self.event_watcher, - from_chain=from_chain, - to_chain=to_chain, - window_start=window_start, - window_end=window_end, - eligible_hotkeys=eligible_hotkeys, - min_collateral=min_collateral, - ) - total = sum(crown_blocks.values()) - if total == 0: - continue # empty bucket — pool recycles via the remainder below - - for hotkey, blocks in crown_blocks.items(): - uid = hotkey_to_uid.get(hotkey) - if uid is None: - continue # dereg'd mid-window; credit forfeited - share = blocks / total - sr = success_rate(success_stats.get(hotkey)) - rewards[uid] += pool * share * (sr**SUCCESS_EXPONENT) - - recycle_uid = RECYCLE_UID if RECYCLE_UID < n_uids else 0 - distributed = float(rewards.sum()) - rewards[recycle_uid] += max(0.0, 1.0 - distributed) - - bt.logging.info( - f'V1 scoring: window=[{window_start}, {window_end}], ' - f'distributed={distributed:.6f}, recycled={max(0.0, 1.0 - distributed):.6f}' - ) - - return rewards, set(range(n_uids)) - - -def success_rate(stats: Optional[Tuple[int, int]]) -> float: - """All-time success rate. Zero-outcome miners default to 1.0 (optimistic).""" - if stats is None: - return 1.0 - completed, timed_out = stats - total = completed + timed_out - if total == 0: - return 1.0 - return completed / total - - -def replay_crown_time_window( - store: ValidatorStateStore, - event_watcher, - from_chain: str, - to_chain: str, - window_start: int, - window_end: int, - eligible_hotkeys: Set[str], - min_collateral: int, -) -> Dict[str, float]: - """Walk the merged rate + collateral event stream, accumulate crown blocks. - - Rates come from ``store`` (populated by commitment polling). Collateral - history comes from ``event_watcher`` (populated by contract event replay). - Returns ``{hotkey: crown_blocks_float}``. Ties split credit evenly across - the tied interval. - """ - # 1. Reconstruct state at window_start for every eligible hotkey. - current_rates: Dict[str, float] = {} - current_collateral: Dict[str, int] = {} - - for hotkey in eligible_hotkeys: - latest_rate = store.get_latest_rate_before(hotkey, from_chain, to_chain, window_start) - if latest_rate is not None: - current_rates[hotkey] = latest_rate[0] - latest_col = event_watcher.get_latest_collateral_before(hotkey, window_start) - if latest_col is not None: - current_collateral[hotkey] = latest_col[0] - else: - # No event before window_start — fall back to the watcher's - # current value so a miner whose only collateral event predates - # the retention window still gets credited accurately. - snapshot = event_watcher.collateral.get(hotkey) - if snapshot is not None: - current_collateral[hotkey] = snapshot - - # 2. Merge rate and collateral events within the window, oldest first. - # Collateral events sort BEFORE rate events at the same block so a - # simultaneous "collateral drops + best rate" transition resolves to - # the post-drop state before rate attribution. - rate_events = store.get_rate_events_in_range(from_chain, to_chain, window_start, window_end) - col_events = event_watcher.get_collateral_events_in_range(window_start, window_end) - - merged: List[Tuple[int, int, str, str, float]] = [] - for e in rate_events: - merged.append((e['block'], 1, 'rate', e['hotkey'], float(e['rate']))) - for e in col_events: - merged.append((e['block'], 0, 'collateral', e['hotkey'], float(e['collateral_rao']))) - merged.sort(key=lambda x: (x[0], x[1])) - - # 3. Walk intervals, crediting current holders. - crown_blocks: Dict[str, float] = {} - prev_block = window_start - - def attribute(interval_start: int, interval_end: int) -> None: - duration = interval_end - interval_start - if duration <= 0: - return - holders = crown_holders_at_instant(current_rates, current_collateral, min_collateral, eligible_hotkeys) - if not holders: - return - split = duration / len(holders) - for hk in holders: - crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split - - for block, _order, kind, hotkey, value in merged: - attribute(prev_block, block) - if kind == 'rate': - current_rates[hotkey] = value - else: - current_collateral[hotkey] = int(value) - prev_block = block - - attribute(prev_block, window_end) - return crown_blocks - - -def crown_holders_at_instant( - rates: Dict[str, float], - collaterals: Dict[str, int], - min_collateral: int, - eligible: Set[str], -) -> List[str]: - """Hotkeys tied for best rate, with collateral >= min and eligible.""" - candidates = { - hk: r for hk, r in rates.items() if hk in eligible and collaterals.get(hk, 0) >= min_collateral and r > 0 - } - if not candidates: - return [] - best = max(candidates.values()) - return [hk for hk, r in candidates.items() if r == best] diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py new file mode 100644 index 0000000..37625ec --- /dev/null +++ b/allways/validator/scoring.py @@ -0,0 +1,297 @@ +"""Crown-time scoring pipeline. + +Reward per miner is ``pool * share * success_rate ** SUCCESS_EXPONENT``; +unclaimed pool recycles to ``RECYCLE_UID``. Entry point is +``score_and_reward_miners(validator)``. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import IntEnum +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple + +import bittensor as bt +import numpy as np + +from allways.constants import ( + CREDIBILITY_WINDOW_BLOCKS, + DIRECTION_POOLS, + RECYCLE_UID, + SCORING_WINDOW_BLOCKS, + SUCCESS_EXPONENT, +) +from allways.validator.event_watcher import ContractEventWatcher +from allways.validator.state_store import ValidatorStateStore + +if TYPE_CHECKING: + from neurons.validator import Validator + + +def score_and_reward_miners(self: Validator) -> None: + try: + rewards, miner_uids = calculate_miner_rewards(self) + self.update_scores(rewards, miner_uids) + prune_rate_events(self) + prune_swap_outcomes(self) + except Exception as e: + bt.logging.error(f'Scoring failed: {e}') + + +def prune_rate_events(self: Validator) -> None: + cutoff = self.block - SCORING_WINDOW_BLOCKS + if cutoff > 0: + self.state_store.prune_events_older_than(cutoff) + + +def prune_swap_outcomes(self: Validator) -> None: + cutoff = self.block - CREDIBILITY_WINDOW_BLOCKS + if cutoff > 0: + self.state_store.prune_swap_outcomes_older_than(cutoff) + + +def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: + """Replay the crown-time event stream over the window, derive per-miner + rewards, recycle any undistributed pool to ``RECYCLE_UID``.""" + n_uids = self.metagraph.n.item() + if n_uids == 0: + return np.array([], dtype=np.float32), set() + + window_end = self.block + window_start = max(0, window_end - SCORING_WINDOW_BLOCKS) + + in_metagraph: Set[str] = set(self.metagraph.hotkeys) + eligible_hotkeys: Set[str] = in_metagraph & self.event_watcher.active_miners + hotkey_to_uid: Dict[str, int] = {self.metagraph.hotkeys[uid]: uid for uid in range(n_uids)} + + rewards = np.zeros(n_uids, dtype=np.float32) + credibility_since = max(0, self.block - CREDIBILITY_WINDOW_BLOCKS) + success_stats = self.state_store.get_success_rates_since(credibility_since) + min_collateral = int(self.event_watcher.min_collateral or 0) + + for (from_chain, to_chain), pool in DIRECTION_POOLS.items(): + crown_blocks = replay_crown_time_window( + store=self.state_store, + event_watcher=self.event_watcher, + from_chain=from_chain, + to_chain=to_chain, + window_start=window_start, + window_end=window_end, + eligible_hotkeys=eligible_hotkeys, + min_collateral=min_collateral, + ) + total = sum(crown_blocks.values()) + if total == 0: + continue # empty bucket — pool recycles via the remainder below + + for hotkey, blocks in crown_blocks.items(): + uid = hotkey_to_uid.get(hotkey) + if uid is None: + continue # dereg'd mid-window; credit forfeited + share = blocks / total + sr = success_rate(success_stats.get(hotkey)) + rewards[uid] += pool * share * (sr**SUCCESS_EXPONENT) + + recycle_uid = RECYCLE_UID if RECYCLE_UID < n_uids else 0 + distributed = float(rewards.sum()) + rewards[recycle_uid] += max(0.0, 1.0 - distributed) + + bt.logging.info( + f'V1 scoring: window=[{window_start}, {window_end}], ' + f'distributed={distributed:.6f}, recycled={max(0.0, 1.0 - distributed):.6f}' + ) + + return rewards, set(range(n_uids)) + + +def success_rate(stats: Optional[Tuple[int, int]]) -> float: + """All-time success rate. Zero-outcome miners default to 1.0 (optimistic).""" + if stats is None: + return 1.0 + completed, timed_out = stats + total = completed + timed_out + if total == 0: + return 1.0 + return completed / total + + +# ─── Crown-time replay ─────────────────────────────────────────────────── + + +class EventKind(IntEnum): + """Ordering of coincident-block transitions in the crown-time replay. + + At the same block number, busy transitions apply before collateral + changes, which apply before rate changes. So if a user reserves a miner + in the same block that miner's best rate was posted, the reservation + ends crown credit *before* the rate attribution — matching the intent + that a busy miner doesn't earn a new interval. + """ + + BUSY = 0 + COLLATERAL = 1 + RATE = 2 + + +@dataclass +class ReplayEvent: + """One transition in the chronological replay stream. ``value`` is + polymorphic on ``kind``: rate as float, collateral as rao, or busy delta + of ±1.""" + + block: int + hotkey: str + kind: EventKind + value: float + + @property + def sort_key(self) -> Tuple[int, int]: + return (self.block, int(self.kind)) + + +def reconstruct_window_start_state( + store: ValidatorStateStore, + event_watcher: ContractEventWatcher, + from_chain: str, + to_chain: str, + window_start: int, + eligible_hotkeys: Set[str], +) -> Tuple[Dict[str, float], Dict[str, int], Dict[str, int]]: + """Snapshot rates, collateral, and busy counts as they stood at window_start.""" + rates: Dict[str, float] = {} + collateral: Dict[str, int] = {} + busy_count: Dict[str, int] = dict(event_watcher.get_busy_miners_at(window_start)) + + for hotkey in eligible_hotkeys: + latest_rate = store.get_latest_rate_before(hotkey, from_chain, to_chain, window_start) + if latest_rate is not None: + rates[hotkey] = latest_rate[0] + + latest_col = event_watcher.get_latest_collateral_before(hotkey, window_start) + if latest_col is not None: + collateral[hotkey] = latest_col[0] + else: + # No event before window_start — fall back to the watcher's current + # snapshot so a miner whose only collateral event predates retention + # still gets credited accurately. + snapshot = event_watcher.collateral.get(hotkey) + if snapshot is not None: + collateral[hotkey] = snapshot + + return rates, collateral, busy_count + + +def merge_replay_events( + store: ValidatorStateStore, + event_watcher: ContractEventWatcher, + from_chain: str, + to_chain: str, + window_start: int, + window_end: int, +) -> List[ReplayEvent]: + """Merge in-window rate, collateral, and busy transitions into one + chronologically-sorted stream.""" + events: List[ReplayEvent] = [] + + for e in event_watcher.get_busy_events_in_range(window_start, window_end): + events.append(ReplayEvent(block=e['block'], hotkey=e['hotkey'], kind=EventKind.BUSY, value=float(e['delta']))) + + for e in event_watcher.get_collateral_events_in_range(window_start, window_end): + events.append( + ReplayEvent( + block=e['block'], hotkey=e['hotkey'], kind=EventKind.COLLATERAL, value=float(e['collateral_rao']) + ) + ) + + for e in store.get_rate_events_in_range(from_chain, to_chain, window_start, window_end): + events.append(ReplayEvent(block=e['block'], hotkey=e['hotkey'], kind=EventKind.RATE, value=float(e['rate']))) + + events.sort(key=lambda ev: ev.sort_key) + return events + + +def replay_crown_time_window( + store: ValidatorStateStore, + event_watcher: ContractEventWatcher, + from_chain: str, + to_chain: str, + window_start: int, + window_end: int, + eligible_hotkeys: Set[str], + min_collateral: int, +) -> Dict[str, float]: + """Walk the merged event stream, return ``{hotkey: crown_blocks_float}``. + Ties at the same rate split credit evenly; busy miners are excluded so + credit flows to the next-best idle miner.""" + rates, collateral, busy_count = reconstruct_window_start_state( + store, event_watcher, from_chain, to_chain, window_start, eligible_hotkeys + ) + replay_events = merge_replay_events(store, event_watcher, from_chain, to_chain, window_start, window_end) + + crown_blocks: Dict[str, float] = {} + prev_block = window_start + + def credit_interval(interval_start: int, interval_end: int) -> None: + duration = interval_end - interval_start + if duration <= 0: + return + busy_set = {hk for hk, c in busy_count.items() if c > 0} + holders = crown_holders_at_instant(rates, collateral, min_collateral, eligible_hotkeys, busy=busy_set) + if not holders: + return + split = duration / len(holders) + for hk in holders: + crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split + + def apply_event(event: ReplayEvent) -> None: + if event.kind is EventKind.RATE: + rates[event.hotkey] = event.value + elif event.kind is EventKind.COLLATERAL: + collateral[event.hotkey] = int(event.value) + else: # BUSY + new_count = busy_count.get(event.hotkey, 0) + int(event.value) + if new_count > 0: + busy_count[event.hotkey] = new_count + else: + busy_count.pop(event.hotkey, None) + + for event in replay_events: + credit_interval(prev_block, event.block) + apply_event(event) + prev_block = event.block + + credit_interval(prev_block, window_end) + return crown_blocks + + +def crown_holders_at_instant( + rates: Dict[str, float], + collaterals: Dict[str, int], + min_collateral: int, + eligible: Set[str], + busy: Optional[Set[str]] = None, +) -> List[str]: + """Take the miners posting the best rate, but only if they satisfy every + other condition (eligible, not busy, collateral >= min, rate > 0). If the + best rate has no qualified miner, fall through to the next-best rate.""" + busy = busy or set() + + def qualifies(hotkey: str) -> bool: + return ( + hotkey in eligible + and hotkey not in busy + and collaterals.get(hotkey, 0) >= min_collateral + and rates.get(hotkey, 0) > 0 + ) + + by_rate: Dict[float, List[str]] = {} + for hotkey, rate in rates.items(): + if rate > 0: + by_rate.setdefault(rate, []).append(hotkey) + + for rate in sorted(by_rate, reverse=True): + winners = [hk for hk in by_rate[rate] if qualifies(hk)] + if winners: + return winners + + return [] diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index f7be242..07b8ed6 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -1,23 +1,11 @@ -"""Single SQLite-backed store for all validator-local state. - -Consolidates what used to be two files (``rate_state.db`` + ``pending_confirms.db``) -into one ``state.db`` with a single connection, a single lock, and one class -holding every table the validator owns: - -- ``pending_confirms`` — user swap confirmations awaiting tx confirmations; - written by axon handler thread, drained by forward loop thread. -- ``rate_events`` — per-miner per-direction rate history used by the V1 - crown-time scoring replay. Bounded by ``RATE_UPDATE_MIN_INTERVAL_BLOCKS`` - throttle on insert and ``EVENT_RETENTION_BLOCKS`` on prune. -- ``collateral_events`` — per-miner collateral history. One row per observed - change. Pruned alongside rate_events. -- ``swap_outcomes`` — all-time credibility ledger keyed by ``swap_id``. Never - time-pruned; only removed when a hotkey deregisters. Read during scoring - to compute ``success_rate ** SUCCESS_EXPONENT``. - -Threading: one ``sqlite3.Connection`` opened with ``check_same_thread=False`` -behind a ``threading.Lock``. ``busy_timeout`` set before ``journal_mode=WAL`` -so concurrent openers wait on the init lock instead of erroring out. +"""SQLite-backed store for all validator-local state. + +Tables: ``pending_confirms`` (axon→forward queue), ``rate_events`` (crown-time +input), ``swap_outcomes`` (credibility ledger). Single connection guarded by +one lock; opened with ``check_same_thread=False``. ``busy_timeout`` is set +before ``journal_mode=WAL`` because the WAL flip takes a brief exclusive lock +that concurrent openers would otherwise hit as "database is locked" — the +local dev env runs two validators against the same file. """ import sqlite3 @@ -27,12 +15,10 @@ from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple -from allways.constants import RATE_UPDATE_MIN_INTERVAL_BLOCKS - @dataclass class PendingConfirm: - """All data needed to call ``vote_initiate`` once tx confirmations are met.""" + """All data needed to call ``vote_initiate`` once tx confirmations land.""" miner_hotkey: str from_tx_hash: str @@ -51,8 +37,6 @@ class PendingConfirm: class ValidatorStateStore: - """Single-connection SQLite store owning every validator-local table.""" - def __init__( self, db_path: Path | str | None = None, @@ -62,10 +46,9 @@ def __init__( self.db_path.parent.mkdir(parents=True, exist_ok=True) self.lock = threading.Lock() self.conn: Optional[sqlite3.Connection] = sqlite3.connect(self.db_path, check_same_thread=False) - # busy_timeout must be set BEFORE journal_mode: setting WAL mode takes a - # brief exclusive lock that a concurrent opener will otherwise hit as an - # immediate "database is locked" error (dev env runs two validators - # against the same SQLite file). + # busy_timeout must be set before journal_mode: the WAL switch takes a + # brief exclusive lock that a concurrent opener would otherwise hit as + # an immediate "database is locked" error. self.conn.execute('PRAGMA busy_timeout=5000') self.conn.execute('PRAGMA journal_mode=WAL') self.conn.row_factory = sqlite3.Row @@ -75,7 +58,6 @@ def __init__( # ─── pending_confirms ─────────────────────────────────────────────── def enqueue(self, item: PendingConfirm) -> None: - """Add or replace a pending confirm. Keyed by ``miner_hotkey``.""" with self.lock: conn = self.require_connection() conn.execute( @@ -107,19 +89,14 @@ def enqueue(self, item: PendingConfirm) -> None: conn.commit() def get_all(self) -> List[PendingConfirm]: - """Return a snapshot of all pending items, oldest first. - - Read-only — does not purge expired entries. Call ``purge_expired`` - explicitly from the forward loop once per tick instead of side- - effecting every read. - """ + """Snapshot of pending items, oldest first. Does not purge expired + entries — call ``purge_expired_pending_confirms`` explicitly.""" with self.lock: conn = self.require_connection() rows = conn.execute('SELECT * FROM pending_confirms ORDER BY queued_at').fetchall() return [self.row_to_pending(row) for row in rows] def remove(self, miner_hotkey: str) -> Optional[PendingConfirm]: - """Remove and return a specific entry.""" with self.lock: conn = self.require_connection() row = conn.execute( @@ -147,12 +124,8 @@ def pending_size(self) -> int: count = conn.execute('SELECT COUNT(*) FROM pending_confirms').fetchone()[0] return int(count) - def purge_expired_pending(self) -> int: - """Drop pending confirms whose reservation has already expired. - - Returns the number of rows removed. Meant to be called once per - forward-loop tick — the forward loop knows when it's safe to mutate. - """ + def purge_expired_pending_confirms(self) -> int: + """Drop pending confirms whose reservation has already expired.""" if self.current_block_fn is None: return 0 current_block = self.current_block_fn() @@ -191,23 +164,20 @@ def insert_rate_event( rate: float, block: int, ) -> bool: - """Insert a rate event if throttle + change conditions pass.""" + """Insert a rate event, skipping same-rate duplicates.""" with self.lock: conn = self.require_connection() row = conn.execute( """ - SELECT rate, block FROM rate_events + SELECT rate FROM rate_events WHERE hotkey = ? AND from_chain = ? AND to_chain = ? ORDER BY block DESC, id DESC LIMIT 1 """, (hotkey, from_chain, to_chain), ).fetchone() - if row is not None: - if block - row['block'] < RATE_UPDATE_MIN_INTERVAL_BLOCKS: - return False - if row['rate'] == rate: - return False + if row is not None and row['rate'] == rate: + return False conn.execute( 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', (hotkey, from_chain, to_chain, rate, block), @@ -222,7 +192,6 @@ def get_latest_rate_before( to_chain: str, block: int, ) -> Optional[Tuple[float, int]]: - """Most recent rate for ``hotkey``+direction at or before ``block``.""" with self.lock: conn = self.require_connection() row = conn.execute( @@ -267,7 +236,6 @@ def insert_swap_outcome( completed: bool, resolved_block: int, ) -> None: - """Insert or replace a swap outcome row. Idempotent on ``swap_id``.""" with self.lock: conn = self.require_connection() conn.execute( @@ -279,8 +247,9 @@ def insert_swap_outcome( ) conn.commit() - def get_all_time_success_rates(self) -> Dict[str, Tuple[int, int]]: - """Return ``{hotkey: (completed_count, timed_out_count)}`` over all outcomes.""" + def get_success_rates_since(self, since_block: int) -> Dict[str, Tuple[int, int]]: + """Return ``{hotkey: (completed_count, timed_out_count)}`` for outcomes + resolved at or after ``since_block``.""" with self.lock: conn = self.require_connection() rows = conn.execute( @@ -289,15 +258,24 @@ def get_all_time_success_rates(self) -> Dict[str, Tuple[int, int]]: SUM(completed) AS completed, SUM(1 - completed) AS timed_out FROM swap_outcomes + WHERE resolved_block >= ? GROUP BY miner_hotkey - """ + """, + (since_block,), ).fetchall() return {r['miner_hotkey']: (int(r['completed']), int(r['timed_out'])) for r in rows} + def prune_swap_outcomes_older_than(self, cutoff_block: int) -> None: + if cutoff_block <= 0: + return + with self.lock: + conn = self.require_connection() + conn.execute('DELETE FROM swap_outcomes WHERE resolved_block < ?', (cutoff_block,)) + conn.commit() + # ─── cross-table maintenance ──────────────────────────────────────── def delete_hotkey(self, hotkey: str) -> None: - """Dereg purge: remove the hotkey from rate/outcomes tables.""" with self.lock: conn = self.require_connection() conn.execute('DELETE FROM rate_events WHERE hotkey = ?', (hotkey,)) @@ -305,14 +283,22 @@ def delete_hotkey(self, hotkey: str) -> None: conn.commit() def prune_events_older_than(self, cutoff_block: int) -> None: - """Delete rate events older than ``cutoff_block``. - - Never touches ``swap_outcomes`` or ``pending_confirms`` — those have - their own lifetimes. - """ + """Delete rate events older than ``cutoff_block``, preserving the + latest row per ``(hotkey, from_chain, to_chain)`` as a state- + reconstruction anchor for ``get_latest_rate_before(window_start)``.""" with self.lock: conn = self.require_connection() - conn.execute('DELETE FROM rate_events WHERE block < ?', (cutoff_block,)) + conn.execute( + """ + DELETE FROM rate_events + WHERE block < ? + AND id NOT IN ( + SELECT MAX(id) FROM rate_events + GROUP BY hotkey, from_chain, to_chain + ) + """, + (cutoff_block,), + ) conn.commit() def close(self) -> None: @@ -371,6 +357,8 @@ def init_db(self) -> None: ); CREATE INDEX IF NOT EXISTS idx_swap_outcomes_hotkey ON swap_outcomes(miner_hotkey); + CREATE INDEX IF NOT EXISTS idx_swap_outcomes_resolved_block + ON swap_outcomes(resolved_block); """ ) conn.commit() diff --git a/allways/validator/swap_tracker.py b/allways/validator/swap_tracker.py index 422cdc2..7200a17 100644 --- a/allways/validator/swap_tracker.py +++ b/allways/validator/swap_tracker.py @@ -1,11 +1,6 @@ -"""Incremental swap lifecycle tracker. Eliminates O(N) full scans. - -Swap outcomes (credibility ledger writes) are owned by -``ContractEventWatcher``, which replays ``SwapCompleted`` / ``SwapTimedOut`` -events into ``state_store.swap_outcomes``. The tracker here just maintains -the in-memory active set so the forward loop knows what to verify, vote on, -and time out. -""" +"""Incremental swap lifecycle tracker. Maintains the in-memory active set +so the forward loop knows what to verify, vote on, and time out. Swap +outcomes (credibility ledger writes) are owned by ``ContractEventWatcher``.""" import asyncio from typing import Dict, List, Set @@ -13,23 +8,19 @@ import bittensor as bt from allways.classes import Swap, SwapStatus +from allways.constants import EXTEND_THRESHOLD_BLOCKS from allways.contract_client import AllwaysContractClient ACTIVE_STATUSES = (SwapStatus.ACTIVE, SwapStatus.FULFILLED) -# How many consecutive ``get_swap == None`` polls we tolerate before dropping -# a swap from the active set. Tolerates transient RPC flakes without the -# fragile timeout-block inference the V1 tracker used. +# Consecutive None polls tolerated before treating a swap as resolved. Smooths +# RPC flakes without the fragile timeout-block inference the V1 tracker used. NULL_SWAP_RETRY_LIMIT = 3 class SwapTracker: - """Tracks swap lifecycle incrementally. No full scans after initialization. - - Two layers: - - Discovery: scan only NEW swap IDs since last poll - - Monitoring: re-fetch all tracked ACTIVE/FULFILLED swaps each poll - """ + """Discovery scans new swap IDs since the last poll; monitoring re-fetches + all tracked ACTIVE/FULFILLED swaps each poll.""" def __init__( self, @@ -40,11 +31,15 @@ def __init__( self.last_scanned_id = 0 self.active: Dict[int, Swap] = {} self.voted_ids: Set[int] = set() + # swap_id → timeout_block at vote time. ``is_extend_timeout_voted`` + # auto-clears the entry once the contract has bumped the swap past + # the voted value so the next extension round can vote again. + self.extend_timeout_voted_at: Dict[int, int] = {} self.null_retry_count: Dict[int, int] = {} self.fulfillment_timeout_blocks = fulfillment_timeout_blocks def initialize(self, current_block: int): - """Cold start — scan backward from latest swap to populate active set.""" + """Cold start: scan backward from latest swap to seed active set.""" next_id = self.client.get_next_swap_id() if next_id <= 1: self.last_scanned_id = 0 @@ -74,29 +69,41 @@ def initialize(self, current_block: int): bt.logging.info(f'SwapTracker initialized: active={len(self.active)}, last_scanned_id={self.last_scanned_id}') def resolve(self, swap_id: int, status: SwapStatus, block: int): - """Drop a swap from active tracking after this validator's vote reached quorum. - - Outcome persistence is the event watcher's job — we only manage the - in-memory active set here. - """ + """Drop a swap from tracking after our vote reached quorum.""" swap = self.active.pop(swap_id, None) if swap is None: return swap.status = status swap.completed_block = block self.voted_ids.discard(swap_id) + self.extend_timeout_voted_at.pop(swap_id, None) self.null_retry_count.pop(swap_id, None) def mark_voted(self, swap_id: int): - """Mark a swap as voted on to prevent redundant vote extrinsics.""" + """Mark a swap as voted on to prevent redundant confirm/timeout extrinsics.""" self.voted_ids.add(swap_id) def is_voted(self, swap_id: int) -> bool: - """Check if we've already voted on this swap.""" return swap_id in self.voted_ids + def mark_extend_timeout_voted(self, swap_id: int) -> None: + swap = self.active.get(swap_id) + if swap is not None: + self.extend_timeout_voted_at[swap_id] = swap.timeout_block + + def is_extend_timeout_voted(self, swap_id: int) -> bool: + voted_at = self.extend_timeout_voted_at.get(swap_id) + if voted_at is None: + return False + swap = self.active.get(swap_id) + if swap is not None and swap.timeout_block > voted_at: + # contract extended the swap → vote opens again for the next round + self.extend_timeout_voted_at.pop(swap_id, None) + return False + return True + async def poll(self, current_block: int = 0): - """Incremental update — called every forward step (~12s).""" + """Incremental refresh — called every forward step.""" try: await self.poll_inner() except (ConnectionError, TimeoutError, asyncio.TimeoutError) as e: @@ -112,8 +119,7 @@ async def poll_inner(self): fresh: Set[int] = set() new_ids = list(range(self.last_scanned_id + 1, next_id)) if new_ids: - # return_exceptions=True so a single flaky get_swap doesn't abort - # the whole discovery pass and kill the forward step. + # return_exceptions=True keeps one flaky get_swap from killing the step. swaps = await asyncio.gather( *[asyncio.to_thread(self.client.get_swap, sid) for sid in new_ids], return_exceptions=True, @@ -146,31 +152,20 @@ async def poll_inner(self): return_exceptions=True, ) + # Null and transient errors share one retry policy — a missing swap + # is either an RPC flake or a freshly-resolved entry the event + # watcher will record. Retry a few times, then drop. resolved_ids: List[int] = [] for sid, result in zip(stale_ids, swaps): if isinstance(result, Exception): bt.logging.debug(f'SwapTracker: get_swap({sid}) failed during refresh: {result}') - # Treat a transient error like a null return — bump the retry - # counter, drop after NULL_SWAP_RETRY_LIMIT failures. - retries = self.null_retry_count.get(sid, 0) + 1 - if retries >= NULL_SWAP_RETRY_LIMIT: - resolved_ids.append(sid) - else: - self.null_retry_count[sid] = retries - continue - swap = result - if swap is None: - # Contract returned None. Either the swap resolved and was - # removed from contract storage, or an RPC flake. Retry a few - # times before dropping — event watcher will write the outcome - # when it replays the SwapCompleted/SwapTimedOut events. - retries = self.null_retry_count.get(sid, 0) + 1 - if retries >= NULL_SWAP_RETRY_LIMIT: + result = None + + if result is None: + if self.bump_null_retry(sid): resolved_ids.append(sid) - else: - self.null_retry_count[sid] = retries - elif swap.status in ACTIVE_STATUSES: - self.active[sid] = swap + elif result.status in ACTIVE_STATUSES: + self.active[sid] = result self.null_retry_count.pop(sid, None) else: resolved_ids.append(sid) @@ -185,17 +180,24 @@ async def poll_inner(self): self.prune_stale_voted_ids() - def prune_stale_voted_ids(self) -> None: - """Drop any voted_ids entries whose swap is no longer being tracked. + def bump_null_retry(self, swap_id: int) -> bool: + """Returns True when the retry limit is hit and the caller should + treat the swap as resolved.""" + retries = self.null_retry_count.get(swap_id, 0) + 1 + if retries >= NULL_SWAP_RETRY_LIMIT: + return True + self.null_retry_count[swap_id] = retries + return False - voted_ids is normally cleaned up alongside active.pop() in resolve() - and the refresh loop, but an exceptional path (e.g. active.pop raced - with a manual test fixture) can leave orphans. Cap the set to prevent - unbounded growth. - """ - orphans = self.voted_ids - set(self.active.keys()) - if orphans: - self.voted_ids -= orphans + def prune_stale_voted_ids(self) -> None: + """Drop any voted state for swaps no longer being tracked. Normally + handled inline in ``resolve``/refresh, but an exceptional path (e.g. + active.pop raced by a fixture) can leave orphans.""" + active_ids = set(self.active.keys()) + self.voted_ids -= self.voted_ids - active_ids + for sid in list(self.extend_timeout_voted_at.keys()): + if sid not in active_ids: + del self.extend_timeout_voted_at[sid] def get_fulfilled(self, current_block: int) -> List[Swap]: """Active FULFILLED swaps not yet past timeout (ready for verification).""" @@ -205,12 +207,14 @@ def get_fulfilled(self, current_block: int) -> List[Swap]: if s.status == SwapStatus.FULFILLED and (s.timeout_block == 0 or current_block <= s.timeout_block) ] - def get_near_timeout_fulfilled(self, current_block: int, threshold: int) -> List[Swap]: - """FULFILLED swaps approaching timeout (within threshold blocks).""" + def get_near_timeout_fulfilled(self, current_block: int) -> List[Swap]: + """FULFILLED swaps within EXTEND_THRESHOLD_BLOCKS of their timeout.""" return [ s for s in self.active.values() - if s.status == SwapStatus.FULFILLED and s.timeout_block > 0 and current_block >= s.timeout_block - threshold + if s.status == SwapStatus.FULFILLED + and s.timeout_block > 0 + and current_block >= s.timeout_block - EXTEND_THRESHOLD_BLOCKS ] def get_timed_out(self, current_block: int) -> List[Swap]: diff --git a/allways/validator/voting.py b/allways/validator/voting.py index 3b6ac40..0ed0cfc 100644 --- a/allways/validator/voting.py +++ b/allways/validator/voting.py @@ -1,12 +1,6 @@ -"""Thin wrappers around the 3 vote extrinsics a validator sends on swaps. - -All three functions return ``bool`` — True on success, False on any error -(contract rejection, RPC failure, etc.). The caller logs the outcome and -uses the bool to decide whether to advance local state. - -Keep these as module-level functions, not a class — they're pure -transformations of (client, wallet, swap_id) → bool with no retained state. -""" +"""Thin wrappers around the swap-vote extrinsics. confirm_swap and +timeout_swap return False on any error; extend_swap_timeout propagates so +the caller can distinguish ``AlreadyVoted`` from real failures.""" import bittensor as bt @@ -36,13 +30,7 @@ def timeout_swap(client: AllwaysContractClient, wallet: bt.Wallet, swap_id: int) def extend_swap_timeout(client: AllwaysContractClient, wallet: bt.Wallet, swap_id: int) -> bool: - """Vote to extend a FULFILLED swap's deadline when the dest tx needs - more confirmations before timeout fires. - - Unlike confirm/timeout, this function lets exceptions propagate so - callers can distinguish expected rejections (``AlreadyVoted``, - ``ContractReverted``) from real errors. Returning True here means the - extrinsic was accepted, not that the swap is actually extended. - """ + """Vote to extend a FULFILLED swap's deadline. Lets exceptions propagate + so the caller can distinguish ``AlreadyVoted`` from real errors.""" client.vote_extend_timeout(wallet=wallet, swap_id=swap_id) return True diff --git a/neurons/validator.py b/neurons/validator.py index 18ab670..a1c5a49 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -70,7 +70,11 @@ def __init__(self, config=None): # so pending_confirms can purge expired reservations lazily on read. self.state_store = ValidatorStateStore(current_block_fn=lambda: self.block) self.last_known_rates: dict[tuple[str, str, str], float] = {} - self.last_commitment_poll_block: int = 0 + # (miner_hotkey, from_tx_hash) → reserved_until at vote time. Skips + # redundant vote_extend_reservation extrinsics — auto-clears once the + # contract bumps reserved_until past the voted value, so the next + # extension round is open. + self.extend_reservation_voted_at: dict[tuple[str, str], int] = {} # Event-sourced miner state. Replaces the old _poll_collaterals + # _refresh_min_collateral polling loops. ``sync_to(current_block)`` diff --git a/tests/test_commitments.py b/tests/test_commitments.py index 1cfa250..ba8bba1 100644 --- a/tests/test_commitments.py +++ b/tests/test_commitments.py @@ -1,6 +1,9 @@ -"""Tests for allways.commitments — commitment string parsing.""" +"""Tests for allways.commitments — commitment string parsing + query_map read.""" -from allways.commitments import parse_commitment_data +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +from allways.commitments import parse_commitment_data, read_miner_commitments class TestParseCommitmentData: @@ -159,3 +162,89 @@ def test_default_uid_and_hotkey(self): def test_same_chain(self): assert parse_commitment_data('v1:btc:addr:btc:addr:340:350') is None + + +class TestReadMinerCommitmentsQueryMap: + """Coverage for the query_map-batched read. + + ``read_miner_commitments`` used to do N separate ``substrate.query`` calls + in a for-loop. It now uses ``substrate.query_map`` to pull every + ``(hotkey, commitment)`` pair under ``Commitments.CommitmentOf(netuid)`` + in a single RPC. These tests mock the substrate interface to exercise + the new path — the hotkey→uid filter, the decode fallthrough, and the + "commitment exists but miner dereg'd" dropout. + """ + + def make_subtensor(self, hotkeys: list[str], rows: list[tuple[str, str]]) -> MagicMock: + """Build a mock subtensor whose metagraph and query_map match the args. + + ``rows`` is a list of (hotkey, raw_commitment_text) pairs as they'd + come back from Commitments.CommitmentOf. Each raw text is wrapped in + a fake metadata object that the real ``decode_commitment_field`` + can parse. + """ + subtensor = MagicMock() + metagraph = SimpleNamespace( + hotkeys=list(hotkeys), + n=SimpleNamespace(item=lambda: len(hotkeys)), + ) + subtensor.metagraph.return_value = metagraph + + def fake_query_map(module, storage_function, params): + for hotkey, raw in rows: + key = SimpleNamespace(value=hotkey) + # Fake the ink!-shaped metadata that decode_commitment_field walks. + metadata = SimpleNamespace(value={'info': {'fields': [{'Raw0': '0x' + raw.encode().hex()}]}}) + yield key, metadata + + subtensor.substrate.query_map.side_effect = fake_query_map + return subtensor + + def test_returns_parsed_pairs_for_every_registered_miner(self): + subtensor = self.make_subtensor( + hotkeys=['hk_a', 'hk_b'], + rows=[ + ('hk_a', 'v1:btc:bc1qaddr_a:tao:5C_a:340:350'), + ('hk_b', 'v1:btc:bc1qaddr_b:tao:5C_b:345:355'), + ], + ) + pairs = read_miner_commitments(subtensor, netuid=7) + assert len(pairs) == 2 + by_hotkey = {p.hotkey: p for p in pairs} + assert by_hotkey['hk_a'].uid == 0 + assert by_hotkey['hk_b'].uid == 1 + assert by_hotkey['hk_a'].rate == 340.0 + assert by_hotkey['hk_b'].rate == 345.0 + + def test_drops_dereg_hotkey_still_in_storage(self): + """A miner can deregister before their commitment is cleared from + Commitments.CommitmentOf. Those rows must be skipped.""" + subtensor = self.make_subtensor( + hotkeys=['hk_live'], # only hk_live is in the metagraph + rows=[ + ('hk_live', 'v1:btc:bc1qlive:tao:5Clive:340:350'), + ('hk_ghost', 'v1:btc:bc1qghost:tao:5Cghost:999:999'), + ], + ) + pairs = read_miner_commitments(subtensor, netuid=7) + assert [p.hotkey for p in pairs] == ['hk_live'] + + def test_single_query_map_call(self): + """Regression guard: we must not fall back into an N-RPC loop.""" + subtensor = self.make_subtensor( + hotkeys=['hk_a', 'hk_b', 'hk_c'], + rows=[('hk_a', 'v1:btc:a:tao:a:1:1')], + ) + read_miner_commitments(subtensor, netuid=7) + assert subtensor.substrate.query_map.call_count == 1 + # And no per-hotkey query() calls leaked back in. + assert subtensor.substrate.query.call_count == 0 + + def test_transient_error_returns_empty_list(self): + """ConnectionError / TimeoutError during query_map shouldn't raise.""" + subtensor = MagicMock() + subtensor.metagraph.return_value = SimpleNamespace(hotkeys=['hk_a'], n=SimpleNamespace(item=lambda: 1)) + subtensor.substrate.query_map.side_effect = ConnectionError('websocket dead') + with patch('allways.commitments.bt.logging.warning'): + pairs = read_miner_commitments(subtensor, netuid=7) + assert pairs == [] diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py index c01fc9c..fbc5eca 100644 --- a/tests/test_event_watcher.py +++ b/tests/test_event_watcher.py @@ -124,14 +124,14 @@ class TestSwapOutcomePersistence: def test_completed_writes_ledger(self, tmp_path: Path): w = make_watcher(tmp_path) w.apply_event(100, 'SwapCompleted', {'swap_id': 42, 'miner': 'hk_a'}) - stats = w.state_store.get_all_time_success_rates() + stats = w.state_store.get_success_rates_since(0) assert stats['hk_a'] == (1, 0) w.state_store.close() def test_timed_out_writes_ledger(self, tmp_path: Path): w = make_watcher(tmp_path) w.apply_event(100, 'SwapTimedOut', {'swap_id': 42, 'miner': 'hk_a'}) - stats = w.state_store.get_all_time_success_rates() + stats = w.state_store.get_success_rates_since(0) assert stats['hk_a'] == (0, 1) w.state_store.close() @@ -140,11 +140,74 @@ def test_mixed_outcomes_counted(self, tmp_path: Path): w.apply_event(100, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) w.apply_event(101, 'SwapCompleted', {'swap_id': 2, 'miner': 'hk_a'}) w.apply_event(102, 'SwapTimedOut', {'swap_id': 3, 'miner': 'hk_a'}) - stats = w.state_store.get_all_time_success_rates() + stats = w.state_store.get_success_rates_since(0) assert stats['hk_a'] == (2, 1) w.state_store.close() +class TestBusyIntervals: + def test_initiate_marks_busy_then_complete_frees(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.apply_event(100, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + assert 'hk_a' in w.get_busy_miners_at(100) + assert w.open_swap_count['hk_a'] == 1 + + w.apply_event(150, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + assert w.open_swap_count['hk_a'] == 0 + assert 'hk_a' not in w.get_busy_miners_at(150) + w.state_store.close() + + def test_timeout_frees_busy_miner(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.apply_event(100, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + w.apply_event(500, 'SwapTimedOut', {'swap_id': 1, 'miner': 'hk_a'}) + assert w.open_swap_count['hk_a'] == 0 + assert 'hk_a' not in w.get_busy_miners_at(500) + w.state_store.close() + + def test_get_busy_events_in_range_is_block_filtered(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.apply_event(100, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + w.apply_event(200, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + w.apply_event(300, 'SwapInitiated', {'swap_id': 2, 'miner': 'hk_b'}) + # Range is (start, end]: block 100 is excluded, 200/300 included + events = w.get_busy_events_in_range(100, 300) + assert [(e['block'], e['hotkey'], e['delta']) for e in events] == [ + (200, 'hk_a', -1), + (300, 'hk_b', +1), + ] + w.state_store.close() + + def test_count_never_goes_negative(self, tmp_path: Path): + """A terminal event with no matching initiate (e.g. bootstrap gap) + is dropped rather than letting count go negative.""" + w = make_watcher(tmp_path) + w.apply_event(500, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + assert w.open_swap_count.get('hk_a', 0) == 0 + # And no event was recorded + assert w.busy_events == [] + w.state_store.close() + + def test_bootstrap_seeds_busy_from_active_swaps(self, tmp_path: Path): + from unittest.mock import MagicMock + + w = make_watcher(tmp_path) + client = MagicMock() + client.get_miner_collateral.return_value = 0 + client.get_miner_active_flag.return_value = False + client.get_min_collateral.return_value = 0 + client.get_active_swaps.return_value = [ + type('S', (), {'miner_hotkey': 'hk_a', 'initiated_block': 50})(), + type('S', (), {'miner_hotkey': 'hk_b', 'initiated_block': 80})(), + ] + w.initialize(current_block=100, metagraph_hotkeys=['hk_a', 'hk_b'], contract_client=client) + + assert w.open_swap_count == {'hk_a': 1, 'hk_b': 1} + busy_now = w.get_busy_miners_at(100) + assert busy_now == {'hk_a': 1, 'hk_b': 1} + w.state_store.close() + + class TestSCALEDecoder: """Decoder fixtures: hand-build event bytes and feed them through. @@ -237,18 +300,22 @@ class TestBootstrap: """initialize() snapshotting behavior — the M1 fix.""" def test_bootstrap_seeds_collateral_and_active_from_contract(self, tmp_path: Path): + from allways.constants import SCORING_WINDOW_BLOCKS + w = make_watcher(tmp_path) client = MagicMock() client.get_miner_collateral.side_effect = lambda hk: {'hk_a': 10, 'hk_b': 20}.get(hk, 0) client.get_miner_active_flag.side_effect = lambda hk: hk == 'hk_a' client.get_min_collateral.return_value = 5 - w.initialize(current_block=1000, metagraph_hotkeys=['hk_a', 'hk_b'], contract_client=client) + current_block = SCORING_WINDOW_BLOCKS + 500 # well past the backfill floor + w.initialize(current_block=current_block, metagraph_hotkeys=['hk_a', 'hk_b'], contract_client=client) assert w.collateral == {'hk_a': 10, 'hk_b': 20} assert w.active_miners == {'hk_a'} assert w.min_collateral == 5 - assert w.cursor == 1000 + # Cursor rewinds one scoring window so sync_to backfills the crown-time history. + assert w.cursor == current_block - SCORING_WINDOW_BLOCKS w.state_store.close() def test_bootstrap_tolerates_contract_read_failures(self, tmp_path: Path): @@ -258,12 +325,13 @@ def test_bootstrap_tolerates_contract_read_failures(self, tmp_path: Path): client.get_miner_active_flag.side_effect = RuntimeError('rpc down') client.get_min_collateral.side_effect = RuntimeError('rpc down') + # Pre-window start (current_block < SCORING_WINDOW_BLOCKS) — cursor clamps at 0. w.initialize(current_block=500, metagraph_hotkeys=['hk_a'], contract_client=client) # Everything defaults to empty/starting state, no exception propagated assert w.collateral == {} assert w.active_miners == set() - assert w.cursor == 500 + assert w.cursor == 0 w.state_store.close() diff --git a/tests/test_pending_confirm_queue.py b/tests/test_pending_confirm_queue.py index d413391..c15647b 100644 --- a/tests/test_pending_confirm_queue.py +++ b/tests/test_pending_confirm_queue.py @@ -88,7 +88,7 @@ def test_has_reflects_enqueue_and_remove(self, tmp_path: Path): assert removed.miner_hotkey == 'miner-1' assert not queue.has('miner-1') - def test_purge_expired_pending_removes_stale_entries(self, tmp_path: Path): + def test_purge_expired_pending_confirms_removes_stale_entries(self, tmp_path: Path): db_path = tmp_path / 'state.db' queue = ValidatorStateStore( db_path=db_path, @@ -98,7 +98,7 @@ def test_purge_expired_pending_removes_stale_entries(self, tmp_path: Path): queue.enqueue(PENDING_CONFIRM_SAMPLE1) # reserved_until=100 → expired at block 101 queue.enqueue(replace(PENDING_CONFIRM_SAMPLE2, reserved_until=105)) - removed = queue.purge_expired_pending() + removed = queue.purge_expired_pending_confirms() assert removed == 1 items = queue.get_all() diff --git a/tests/test_poll_commitments.py b/tests/test_poll_commitments.py index aa61e74..d0edc76 100644 --- a/tests/test_poll_commitments.py +++ b/tests/test_poll_commitments.py @@ -3,11 +3,7 @@ from unittest.mock import MagicMock, patch from allways.classes import MinerPair -from allways.constants import ( - COMMITMENT_POLL_INTERVAL_BLOCKS, - EVENT_RETENTION_BLOCKS, - RATE_UPDATE_MIN_INTERVAL_BLOCKS, -) +from allways.constants import SCORING_WINDOW_BLOCKS from allways.validator.forward import poll_commitments from allways.validator.state_store import ValidatorStateStore @@ -46,7 +42,6 @@ def make_validator(tmp_path: Path, hotkeys=None) -> SimpleNamespace: contract_client=MagicMock(), event_watcher=MagicMock(), last_known_rates={}, - last_commitment_poll_block=0, ) @@ -66,7 +61,6 @@ def test_first_poll_inserts_both_directions_per_miner(self, tmp_path: Path): btc_tao = v.state_store.get_rate_events_in_range('btc', 'tao', 0, 2000) assert len(tao_btc) == 2 assert len(btc_tao) == 2 - assert v.last_commitment_poll_block == v.block assert v.last_known_rates == { ('hk_a', 'tao', 'btc'): 0.00015, ('hk_a', 'btc', 'tao'): 6500.0, @@ -75,17 +69,6 @@ def test_first_poll_inserts_both_directions_per_miner(self, tmp_path: Path): } v.state_store.close() - def test_poll_within_interval_is_noop(self, tmp_path: Path): - v = make_validator(tmp_path) - v.last_commitment_poll_block = v.block - (COMMITMENT_POLL_INTERVAL_BLOCKS - 1) - - mock_read = MagicMock(return_value=[]) - with patch('allways.validator.forward.read_miner_commitments', mock_read): - poll_commitments(v) - - mock_read.assert_not_called() - v.state_store.close() - class TestPollCommitmentsChanges: def test_no_changes_across_polls_inserts_nothing_extra(self, tmp_path: Path): @@ -95,7 +78,7 @@ def test_no_changes_across_polls_inserts_nothing_extra(self, tmp_path: Path): with patch('allways.validator.forward.read_miner_commitments', return_value=pairs): poll_commitments(v) - v.block += COMMITMENT_POLL_INTERVAL_BLOCKS + v.block += 1 with patch('allways.validator.forward.read_miner_commitments', return_value=pairs): poll_commitments(v) @@ -103,7 +86,9 @@ def test_no_changes_across_polls_inserts_nothing_extra(self, tmp_path: Path): assert len(tao_btc) == 1 v.state_store.close() - def test_rate_change_inserts_new_event_past_throttle(self, tmp_path: Path): + def test_rate_change_inserts_new_event_every_block(self, tmp_path: Path): + """Per-block polling — a rate change is recorded immediately with no + throttle delay.""" v = make_validator(tmp_path) pairs_v1 = [make_pair('hk_a', rate=0.00015, counter_rate=6500.0)] pairs_v2 = [make_pair('hk_a', rate=0.00020, counter_rate=6500.0)] @@ -111,8 +96,8 @@ def test_rate_change_inserts_new_event_past_throttle(self, tmp_path: Path): with patch('allways.validator.forward.read_miner_commitments', return_value=pairs_v1): poll_commitments(v) - # Advance past both the poll interval AND the rate throttle - v.block += RATE_UPDATE_MIN_INTERVAL_BLOCKS + # A single block later — the throttle is gone, so the change lands. + v.block += 1 with patch('allways.validator.forward.read_miner_commitments', return_value=pairs_v2): poll_commitments(v) @@ -123,27 +108,6 @@ def test_rate_change_inserts_new_event_past_throttle(self, tmp_path: Path): assert [e['rate'] for e in btc_tao] == [6500.0] v.state_store.close() - def test_rate_change_blocked_by_throttle_still_advances_cache(self, tmp_path: Path): - v = make_validator(tmp_path) - pairs_v1 = [make_pair('hk_a', rate=0.00015, counter_rate=0.0)] - pairs_v2 = [make_pair('hk_a', rate=0.00020, counter_rate=0.0)] - - with patch('allways.validator.forward.read_miner_commitments', return_value=pairs_v1): - poll_commitments(v) - - # Only past poll interval, NOT past rate throttle. - v.block += COMMITMENT_POLL_INTERVAL_BLOCKS - with patch('allways.validator.forward.read_miner_commitments', return_value=pairs_v2): - poll_commitments(v) - - # Throttle blocked the store insert — only the first event lands. - tao_btc = v.state_store.get_rate_events_in_range('tao', 'btc', 0, 10_000) - assert [e['rate'] for e in tao_btc] == [0.00015] - # But the in-memory cache advances to the observed value so the next - # poll doesn't waste an insert attempt on the same throttled rate. - assert v.last_known_rates[('hk_a', 'tao', 'btc')] == 0.00020 - v.state_store.close() - class TestPollCommitmentsZeroRate: def test_zero_rate_skips_only_that_direction(self, tmp_path: Path): @@ -175,7 +139,7 @@ def test_dereg_removes_hotkey_from_store_and_cache(self, tmp_path: Path): # hk_b deregistered v.metagraph.hotkeys = ['hk_a'] - v.block += COMMITMENT_POLL_INTERVAL_BLOCKS + v.block += 1 with patch('allways.validator.forward.read_miner_commitments', return_value=pairs): poll_commitments(v) @@ -195,36 +159,50 @@ def raiser(*args, **kwargs): with patch('allways.validator.forward.read_miner_commitments', side_effect=raiser): poll_commitments(v) - # No events, but the poll block WAS advanced so we don't hot-retry + # No events persisted on a failed read. assert v.state_store.get_rate_events_in_range('tao', 'btc', 0, 10_000) == [] - assert v.last_commitment_poll_block == v.block v.state_store.close() class TestPollCommitmentsPruning: - def test_prune_removes_events_older_than_retention_window(self, tmp_path: Path): + def test_prune_runs_via_scoring_pass_not_commitment_poll(self, tmp_path: Path): + """Pruning moved out of the per-tick path and into the scoring round — + verify both parts of that contract: commitment polling does NOT prune, + and score_and_reward_miners DOES. The latest row per (hotkey, direction) + is preserved as a state-reconstruction anchor even when it's older than + the cutoff, so this test uses a hotkey with two rows to exercise the + "older one drops, newer one survives" path.""" + from allways.validator.scoring import prune_rate_events + v = make_validator(tmp_path) - # Move the clock forward so the retention cutoff is meaningful. - v.block = EVENT_RETENTION_BLOCKS + 1_000 - ancient_block = 1 # well before cutoff (v.block - EVENT_RETENTION_BLOCKS = 1000) - recent_block = v.block - 100 # safely inside retention + v.block = SCORING_WINDOW_BLOCKS + 1_000 + ancient_block = 1 + recent_block = v.block - 100 conn = v.state_store.require_connection() + # Two rows for the same direction — the ancient one must drop on prune + # while the recent one survives. conn.execute( 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', - ('hk_ancient', 'tao', 'btc', 0.00010, ancient_block), + ('hk_a', 'tao', 'btc', 0.00010, ancient_block), ) conn.execute( 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', - ('hk_recent', 'tao', 'btc', 0.00020, recent_block), + ('hk_a', 'tao', 'btc', 0.00020, recent_block), ) conn.commit() + # 1. Commitment polling no longer prunes — both rows survive. with patch('allways.validator.forward.read_miner_commitments', return_value=[]): poll_commitments(v) + surviving_blocks = {e['block'] for e in v.state_store.get_rate_events_in_range('tao', 'btc', 0, v.block + 1)} + assert ancient_block in surviving_blocks, 'poll_commitments should not prune' + assert recent_block in surviving_blocks - rate_events = v.state_store.get_rate_events_in_range('tao', 'btc', 0, v.block + 1) - surviving_blocks = {e['block'] for e in rate_events} + # 2. Scoring pass prunes the ancient row; the latest row stays as anchor. + prune_rate_events(v) + surviving_blocks = {e['block'] for e in v.state_store.get_rate_events_in_range('tao', 'btc', 0, v.block + 1)} assert ancient_block not in surviving_blocks assert recent_block in surviving_blocks + v.state_store.close() diff --git a/tests/test_rate_state.py b/tests/test_rate_state.py index faabb60..6a4ee3e 100644 --- a/tests/test_rate_state.py +++ b/tests/test_rate_state.py @@ -3,7 +3,6 @@ import pytest -from allways.constants import RATE_UPDATE_MIN_INTERVAL_BLOCKS from allways.validator.state_store import ValidatorStateStore @@ -39,19 +38,13 @@ def test_first_event_accepted(self, tmp_path: Path): assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) is True store.close() - def test_rejected_when_within_throttle_window(self, tmp_path: Path): + def test_rate_change_next_block_is_accepted(self, tmp_path: Path): + """No throttle — a rate change one block later lands immediately.""" store = make_store(tmp_path) assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) is True - # 74 < 75: blocked by throttle - within = 100 + RATE_UPDATE_MIN_INTERVAL_BLOCKS - 1 - assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00016, block=within) is False - store.close() - - def test_accepted_when_past_throttle_window(self, tmp_path: Path): - store = make_store(tmp_path) - assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) is True - past = 100 + RATE_UPDATE_MIN_INTERVAL_BLOCKS - assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00016, block=past) is True + assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00016, block=101) is True + events = store.get_rate_events_in_range('tao', 'btc', start_block=99, end_block=200) + assert [e['rate'] for e in events] == [0.00015, 0.00016] store.close() def test_rejected_when_rate_unchanged(self, tmp_path: Path): @@ -60,7 +53,7 @@ def test_rejected_when_rate_unchanged(self, tmp_path: Path): assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=200) is False store.close() - def test_accepted_when_rate_changed_and_past_throttle(self, tmp_path: Path): + def test_accepted_when_rate_changes(self, tmp_path: Path): store = make_store(tmp_path) assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) is True assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00020, block=200) is True @@ -69,10 +62,10 @@ def test_accepted_when_rate_changed_and_past_throttle(self, tmp_path: Path): store.close() def test_direction_isolation(self, tmp_path: Path): - """Throttle is per (hotkey, from, to) — different directions don't conflict.""" + """Dedupe is per (hotkey, from, to) — different directions don't conflict.""" store = make_store(tmp_path) assert store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) is True - # Same hotkey, other direction — should not be throttled + # Same hotkey, other direction — same-rate dedupe only checks its own direction assert store.insert_rate_event('hk1', 'btc', 'tao', 6500.0, block=105) is True store.close() @@ -83,7 +76,7 @@ def test_idempotent_on_swap_id(self, tmp_path: Path): store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=True, resolved_block=100) store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=False, resolved_block=101) - rates = store.get_all_time_success_rates() + rates = store.get_success_rates_since(0) # Second insert replaced the first: 0 completed, 1 timed_out assert rates == {'hk1': (0, 1)} store.close() @@ -138,10 +131,42 @@ def test_aggregates_completed_and_timed_out(self, tmp_path: Path): store.insert_swap_outcome(swap_id=3, miner_hotkey='hk1', completed=False, resolved_block=102) store.insert_swap_outcome(swap_id=4, miner_hotkey='hk2', completed=True, resolved_block=103) - rates = store.get_all_time_success_rates() + rates = store.get_success_rates_since(0) assert rates == {'hk1': (2, 1), 'hk2': (1, 0)} store.close() + def test_excludes_outcomes_before_since_block(self, tmp_path: Path): + """Rolling window — outcomes before the cutoff don't count.""" + store = make_store(tmp_path) + store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=False, resolved_block=100) + store.insert_swap_outcome(swap_id=2, miner_hotkey='hk1', completed=True, resolved_block=500) + + rates = store.get_success_rates_since(200) + assert rates == {'hk1': (1, 0)} # ancient timeout aged out + store.close() + + +class TestPruneSwapOutcomes: + def test_prune_removes_old_outcomes_only(self, tmp_path: Path): + store = make_store(tmp_path) + store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=True, resolved_block=100) + store.insert_swap_outcome(swap_id=2, miner_hotkey='hk1', completed=True, resolved_block=500) + + store.prune_swap_outcomes_older_than(cutoff_block=200) + + rates = store.get_success_rates_since(0) + assert rates == {'hk1': (1, 0)} # only the resolved_block=500 outcome survives + store.close() + + def test_prune_noop_when_cutoff_nonpositive(self, tmp_path: Path): + store = make_store(tmp_path) + store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=True, resolved_block=100) + store.prune_swap_outcomes_older_than(cutoff_block=0) + store.prune_swap_outcomes_older_than(cutoff_block=-100) + rates = store.get_success_rates_since(0) + assert rates == {'hk1': (1, 0)} + store.close() + class TestDeleteHotkey: def test_removes_from_rate_and_outcome_tables(self, tmp_path: Path): @@ -156,25 +181,66 @@ def test_removes_from_rate_and_outcome_tables(self, tmp_path: Path): store.delete_hotkey('hk1') assert store.get_latest_rate_before('hk1', 'tao', 'btc', block=200) is None - assert 'hk1' not in store.get_all_time_success_rates() + assert 'hk1' not in store.get_success_rates_since(0) # hk2 untouched assert store.get_latest_rate_before('hk2', 'tao', 'btc', block=200) is not None - assert 'hk2' in store.get_all_time_success_rates() + assert 'hk2' in store.get_success_rates_since(0) store.close() class TestPrune: def test_prune_leaves_swap_outcomes_intact(self, tmp_path: Path): + """Pruning only touches rate_events — swap_outcomes has its own lifetime.""" store = make_store(tmp_path) store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) store.insert_swap_outcome(swap_id=1, miner_hotkey='hk1', completed=True, resolved_block=100) store.prune_events_older_than(cutoff_block=200) - # Rate events gone, outcomes retained - assert store.get_latest_rate_before('hk1', 'tao', 'btc', block=200) is None - assert store.get_all_time_success_rates() == {'hk1': (1, 0)} + # Swap outcomes untouched by rate-event prune. + assert store.get_success_rates_since(0) == {'hk1': (1, 0)} + store.close() + + def test_prune_preserves_latest_row_per_direction(self, tmp_path: Path): + """A miner's single rate row must survive even when it's older than + the cutoff — otherwise get_latest_rate_before at window_start would + find nothing and the miner falls out of scoring entirely.""" + store = make_store(tmp_path) + store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) + + # Cutoff is way past block 100, but the row is the only anchor. + store.prune_events_older_than(cutoff_block=5_000) + + assert store.get_latest_rate_before('hk1', 'tao', 'btc', block=10_000) == (0.00015, 100) + store.close() + + def test_prune_drops_older_rows_when_newer_exists(self, tmp_path: Path): + """When a direction has multiple rows, rows older than the cutoff + get pruned as long as a newer row survives as the anchor.""" + store = make_store(tmp_path) + store.insert_rate_event('hk1', 'tao', 'btc', 0.00010, block=100) + store.insert_rate_event('hk1', 'tao', 'btc', 0.00020, block=200) + store.insert_rate_event('hk1', 'tao', 'btc', 0.00030, block=6_000) + + store.prune_events_older_than(cutoff_block=5_000) + + # blocks 100 and 200 drop; block 6000 survives. + events = store.get_rate_events_in_range('tao', 'btc', start_block=0, end_block=10_000) + assert [e['block'] for e in events] == [6_000] + store.close() + + def test_prune_preserves_latest_per_direction_independently(self, tmp_path: Path): + """Preservation is keyed on (hotkey, from_chain, to_chain) — each + direction keeps its own anchor row.""" + store = make_store(tmp_path) + store.insert_rate_event('hk1', 'tao', 'btc', 0.00015, block=100) + store.insert_rate_event('hk1', 'btc', 'tao', 6500.0, block=100) + + store.prune_events_older_than(cutoff_block=5_000) + + assert store.get_latest_rate_before('hk1', 'tao', 'btc', block=10_000) == (0.00015, 100) + assert store.get_latest_rate_before('hk1', 'btc', 'tao', block=10_000) == (6500.0, 100) store.close() diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index f82b6c6..e71ca4e 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -8,7 +8,7 @@ from allways.constants import RECYCLE_UID, SUCCESS_EXPONENT from allways.validator.event_watcher import ContractEventWatcher -from allways.validator.forward import ( +from allways.validator.scoring import ( calculate_miner_rewards, crown_holders_at_instant, replay_crown_time_window, @@ -98,6 +98,20 @@ def test_tied_best_rate_returns_all(self): holders = set(crown_holders_at_instant(rates, collaterals, MIN_COLLATERAL, {'a', 'b'})) assert holders == {'a', 'b'} + def test_busy_best_rate_loses_to_idle_runner_up(self): + """Miner A has the best rate but is mid-swap — crown goes to B.""" + rates = {'a': 0.00030, 'b': 0.00020} + collaterals = {'a': MIN_COLLATERAL, 'b': MIN_COLLATERAL} + holders = crown_holders_at_instant(rates, collaterals, MIN_COLLATERAL, {'a', 'b'}, busy={'a'}) + assert holders == ['b'] + + def test_all_busy_returns_empty(self): + """Every eligible miner is busy → no crown → pool recycles.""" + rates = {'a': 0.00030, 'b': 0.00020} + collaterals = {'a': MIN_COLLATERAL, 'b': MIN_COLLATERAL} + holders = crown_holders_at_instant(rates, collaterals, MIN_COLLATERAL, {'a', 'b'}, busy={'a', 'b'}) + assert holders == [] + class TestReplayCrownTime: def test_single_miner_holds_full_window(self, tmp_path: Path): @@ -235,6 +249,112 @@ def test_window_start_state_reconstruction_from_pre_window_events(self, tmp_path assert crown == {'hk_a': 1000.0} store.close() + def test_best_rate_miner_goes_busy_credit_flows_to_runner_up(self, tmp_path: Path): + """A holds the best rate but takes a swap at block 400 that resolves + at block 800. During [400, 800] the crown flips to idle runner-up B.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + for row in ( + ('hk_a', 'tao', 'btc', 0.00030, 0), # A is best + ('hk_b', 'tao', 'btc', 0.00020, 0), # B is runner-up + ): + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + row, + ) + conn.commit() + seed_collateral(watcher, 'hk_a', MIN_COLLATERAL, 0) + seed_collateral(watcher, 'hk_b', MIN_COLLATERAL, 0) + + # A goes busy with a swap at 400, completes at 800. + watcher.apply_event(400, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event(800, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + + crown = replay_crown_time_window( + store=store, + event_watcher=watcher, + from_chain='tao', + to_chain='btc', + window_start=100, + window_end=1100, + eligible_hotkeys={'hk_a', 'hk_b'}, + min_collateral=MIN_COLLATERAL, + ) + # A earns (100,400] = 300 + (800,1100] = 300 → 600 total + # B earns (400,800] = 400 total + assert crown == {'hk_a': 600.0, 'hk_b': 400.0} + store.close() + + def test_solo_miner_busy_pool_recycles(self, tmp_path: Path): + """Only one miner has a rate, they're busy for part of the window — + nobody else is eligible, so the busy period earns nothing (recycles).""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + conn = store.require_connection() + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + ('hk_a', 'tao', 'btc', 0.00020, 0), + ) + conn.commit() + seed_collateral(watcher, 'hk_a', MIN_COLLATERAL, 0) + + watcher.apply_event(400, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event(900, 'SwapTimedOut', {'swap_id': 1, 'miner': 'hk_a'}) + + crown = replay_crown_time_window( + store=store, + event_watcher=watcher, + from_chain='tao', + to_chain='btc', + window_start=100, + window_end=1100, + eligible_hotkeys={'hk_a'}, + min_collateral=MIN_COLLATERAL, + ) + # A earns (100,400] = 300 + (900,1100] = 200 → 500. The 500 blocks + # of busy interval have no idle candidate → not credited to anyone + # (the caller recycles via the remainder). + assert crown == {'hk_a': 500.0} + store.close() + + def test_busy_state_at_window_start_is_reconstructed(self, tmp_path: Path): + """Miner A's SwapInitiated fires before window_start and doesn't + resolve until mid-window — replay must see A as busy from the start.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + for row in ( + ('hk_a', 'tao', 'btc', 0.00030, 0), + ('hk_b', 'tao', 'btc', 0.00020, 0), + ): + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + row, + ) + conn.commit() + seed_collateral(watcher, 'hk_a', MIN_COLLATERAL, 0) + seed_collateral(watcher, 'hk_b', MIN_COLLATERAL, 0) + + # A's swap started BEFORE the window opens and completes inside it. + watcher.apply_event(50, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event(500, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + + crown = replay_crown_time_window( + store=store, + event_watcher=watcher, + from_chain='tao', + to_chain='btc', + window_start=100, + window_end=1100, + eligible_hotkeys={'hk_a', 'hk_b'}, + min_collateral=MIN_COLLATERAL, + ) + # From window_start=100 A is already busy (reconstructed from pre-window + # SwapInitiated). B earns (100,500] = 400; A earns (500,1100] = 600. + assert crown == {'hk_b': 400.0, 'hk_a': 600.0} + store.close() + class TestCalculateMinerRewards: def test_empty_direction_recycles_full_pool(self, tmp_path: Path): diff --git a/tests/test_swap_tracker.py b/tests/test_swap_tracker.py index 7774e9b..321dee3 100644 --- a/tests/test_swap_tracker.py +++ b/tests/test_swap_tracker.py @@ -310,16 +310,19 @@ def test_timeout_zero_treated_as_unbounded(self): class TestGetNearTimeoutFulfilled: def test_returns_swaps_within_threshold(self): + from allways.constants import EXTEND_THRESHOLD_BLOCKS + tracker = make_tracker() near = make_swap(swap_id=1, timeout_block=100) near.status = SwapStatus.FULFILLED - far = make_swap(swap_id=2, timeout_block=500) + far = make_swap(swap_id=2, timeout_block=100 + EXTEND_THRESHOLD_BLOCKS * 10) far.status = SwapStatus.FULFILLED tracker.active[1] = near tracker.active[2] = far - # current=90, threshold=20 → near qualifies (90 >= 100 - 20), far doesn't - result = tracker.get_near_timeout_fulfilled(current_block=90, threshold=20) + # current = timeout_block - threshold → near qualifies, far doesn't + current_block = 100 - EXTEND_THRESHOLD_BLOCKS + result = tracker.get_near_timeout_fulfilled(current_block=current_block) assert [s.id for s in result] == [1] def test_excludes_active_status(self): @@ -327,7 +330,7 @@ def test_excludes_active_status(self): active = make_swap(swap_id=1, timeout_block=100) tracker.active[1] = active - assert tracker.get_near_timeout_fulfilled(current_block=90, threshold=20) == [] + assert tracker.get_near_timeout_fulfilled(current_block=90) == [] class TestGetTimedOut: