diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2850e3..932e54a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,7 +66,7 @@ jobs: cd src python -c " import sys; sys.path.insert(0, '..') - from src.utils.config_loader import load_config + from src.utils.configuration import load_config from src.utils.key_validator import validate_and_format_private_key print('Core modules import successfully') " @@ -74,14 +74,50 @@ jobs: - name: Validate configuration run: | python -c " - import tomli - with open('config.toml.example', 'rb') as f: - config = tomli.load(f) - required = ['bigquery', 'blockchain', 'scheduling', 'secrets'] - for section in required: - if section not in config: - raise ValueError(f'Missing section: {section}') - print('Configuration valid') + import sys + import os + + # Add project root to path + sys.path.insert(0, '.') + + os.environ['BLOCKCHAIN_PRIVATE_KEY'] = '0x' + 'f' * 64 + os.environ['SLACK_WEBHOOK_URL'] = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' + os.environ['STUDIO_API_KEY'] = 'api-key' + os.environ['STUDIO_DEPLOY_KEY'] = 'deploy-key' + os.environ['ARBITRUM_API_KEY'] = 'api-key' + os.environ['ETHERSCAN_API_KEY'] = 'api-key' + os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '{}' + + from src.utils.configuration import ConfigLoader, _validate_config + + print('Validating config.toml.example...') + + # Use the example file and run the full validation logic from our application + loader = ConfigLoader(config_path='config.toml.example') + config = loader.get_flat_config() + + print('Patching config in-memory with dummy data for validation...') + config_to_validate = config.copy() + config_to_validate.update({ + 'BIGQUERY_LOCATION_ID': 'dummy-location', + 'BIGQUERY_PROJECT_ID': 'dummy-project', + 'BIGQUERY_DATASET_ID': 'dummy-dataset', + 'BIGQUERY_TABLE_ID': 'dummy-table', + 'BLOCKCHAIN_CONTRACT_ADDRESS': '0x' + '0' * 40, + 'BLOCKCHAIN_FUNCTION_NAME': 'dummyFunction', + 'BLOCKCHAIN_CHAIN_ID': 1, + 'BLOCKCHAIN_RPC_URLS': ['http://dummy-rpc.com'], + 'SUBGRAPH_URL_PRE_PRODUCTION': 'http://dummy-subgraph.com', + 'SUBGRAPH_URL_PRODUCTION': 'http://dummy-subgraph.com', + 'SCHEDULED_RUN_TIME': '00:00', + 'BATCH_SIZE': 100, + 'MAX_AGE_BEFORE_DELETION': 100, + 'BIGQUERY_ANALYSIS_PERIOD_DAYS': 100, + }) + + _validate_config(config_to_validate) + + print('config.toml.example is structurally valid.') " # ============================================================================= diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 693174c..acda954 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -34,7 +34,8 @@ jobs: run: | if [ -d "tests" ] && [ "$(find tests -name "test_*.py" -o -name "*_test.py" | wc -l)" -gt 0 ]; then echo "Running tests" - pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum + # Run pytest and allow exit code 5 (no tests found), but fail on any other error + pytest tests/ -v --cov=src --cov-report=term-missing -p no:ethereum || ([ $? -eq 5 ] && echo "Pytest exited with 5 (No tests found), which is expected. Passing." || exit $?) else echo "No tests found. Test directory is empty or doesn't contain test files." echo "Tests will be skipped until test files are added." diff --git a/Dockerfile b/Dockerfile index ff45f4a..a7b1126 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -# Dockerfile for Service Quality Oracle +# Dockerfile to create a clean, lightweight Docker Image for the Service Quality Oracle # Use Python 3.9 slim as the base image for a lightweight container FROM python:3.9-slim @@ -10,7 +10,12 @@ LABEL description="Service Quality Oracle" \ # Set working directory WORKDIR /app -# Set environment variables + +# Setup enviroment variables: +# 1. PYTHONDONTWRITEBYTECODE=1 - Prevent python from creating .pyc files +# 2. PYTHONUNBUFFERED=1 - Send logs direct to console without buffering +# 3. PYTHONPATH=/app - Add app directory to python import path +# 4. TZ=UTC - Set timezone to UTC ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ PYTHONPATH=/app \ @@ -40,18 +45,17 @@ COPY contracts/ ./contracts/ COPY .gitignore ./ COPY pyproject.toml ./ -# Copy the scheduler to the root directory -COPY src/models/scheduler.py ./ - # Create healthcheck file RUN touch /app/healthcheck # Use Tini as entrypoint for proper signal handling ENTRYPOINT ["/usr/bin/tini", "--"] -# Add healthcheck to verify the service is running -HEALTHCHECK --interval=5m --timeout=30s --start-period=1m --retries=3 \ - CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'" || exit 1 +# Add healthcheck to verify the service is running. +# The scheduler updates the healthcheck file every minute. +# We check every 2 minutes and assert the file was modified in the last 5 minutes (300s). +HEALTHCHECK --interval=2m --timeout=30s --start-period=1m --retries=3 \ + CMD python -c "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 300, 'Healthcheck failed'" || exit 1 -# Run the scheduler -CMD ["python", "scheduler.py"] +# Run the scheduler as a module +CMD ["python", "-m", "src.models.scheduler"] diff --git a/README.md b/README.md index 3b8963d..bbf5fb6 100644 --- a/README.md +++ b/README.md @@ -47,12 +47,19 @@ Please refer to the [ELIGIBILITY_CRITERIA.md](./ELIGIBILITY_CRITERIA.md) file to ## Data Flow -The application follows this data flow: +The application follows a clear data flow, managed by a daily scheduler: -1. **BigQuery Data Acquisition**: The `bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers` function in `issuance_data_access_helper.py` fetches fresh data from BigQuery, processes it to determine eligibility, and returns the eligibility data list that would then be posted on chain. - - This function also ensures that data is saved to local files in dated directories for auditing/historical reference over the data retention period. +1. **Scheduler (`scheduler.py`)**: This is the main entry point. It runs on a schedule (e.g., daily), manages the application lifecycle, and triggers the oracle run. It is also responsible for catching up on any missed runs. -2. **Blockchain Publication**: The eligible indexers list from step 1 is directly posted on-chain to a smart contract. Batching of transactions is performed if necessary. +2. **Orchestrator (`service_quality_oracle.py`)**: For each run, this module orchestrates the end-to-end process by coordinating the other components. + +3. **Data Fetching (`bigquery_data_access_provider.py`)**: The orchestrator calls this provider to execute a configurable SQL query against Google BigQuery, fetching the raw indexer performance data. + +4. **Data Processing (`eligibility_pipeline.py`)**: The raw data is passed to this module, which processes it, filters for eligible and ineligible indexers, and generates CSV artifacts for auditing and record-keeping. + +5. **Blockchain Submission (`blockchain_client.py`)**: The orchestrator takes the final list of eligible indexers and passes it to this client, which handles the complexities of batching, signing, and sending the transaction to the blockchain via RPC providers with built-in failover. + +6. **Notifications (`slack_notifier.py`)**: Throughout the process, status updates (success, failure, warnings) are sent to Slack. ## CI/CD Pipeline diff --git a/config.toml.example b/config.toml.example index 8cb5a26..770a48f 100644 --- a/config.toml.example +++ b/config.toml.example @@ -6,9 +6,10 @@ # ============================================================================= [bigquery] -BIGQUERY_LOCATION_ID = "" -BIGQUERY_PROJECT_ID = "" -BIGQUERY_DATASET_ID = "" +BIGQUERY_LOCATION_ID = "US" +BIGQUERY_PROJECT_ID = "graph-mainnet" +BIGQUERY_DATASET_ID = "internal_metrics" +BIGQUERY_TABLE_ID = "metrics_indexer_attempts" [blockchain] BLOCKCHAIN_CONTRACT_ADDRESS = "" @@ -20,6 +21,8 @@ BLOCKCHAIN_RPC_URLS = [ "", "" ] +BLOCK_EXPLORER_URL = "https://sepolia.arbiscan.io" +TX_TIMEOUT_SECONDS = "30" [scheduling] SCHEDULED_RUN_TIME = "10:00" @@ -31,6 +34,13 @@ SUBGRAPH_URL_PRODUCTION = "" [processing] BATCH_SIZE = 125 MAX_AGE_BEFORE_DELETION = 120 +BIGQUERY_ANALYSIS_PERIOD_DAYS = "28" + +[eligibility_criteria] +MIN_ONLINE_DAYS = "5" +MIN_SUBGRAPHS = "10" +MAX_LATENCY_MS = "5000" +MAX_BLOCKS_BEHIND = "50000" # ============================================================================= # SENSITIVE CONFIGURATION diff --git a/docker-compose.yml b/docker-compose.yml index aa79723..b3e0605 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,14 @@ services: + # Service Quality Oracle container service-quality-oracle: + + # Build the image from the Dockerfile in the current directory build: . + + # Set the container name container_name: service-quality-oracle + + # Set the image name image: service-quality-oracle:latest volumes: @@ -16,9 +23,7 @@ services: - ./credentials.json:/app/credentials.json:ro environment: - - PYTHONPATH=/app - RUN_ON_STARTUP=true - - TZ=UTC # Setup enviroment variables # Environment variables go into process memory for this specific container only @@ -45,8 +50,10 @@ services: reservations: memory: 512M + # Restart policy restart: unless-stopped + # Healthcheck to ensure the container is running healthcheck: test: ["CMD", "python", "-c", "import os, time; assert os.path.exists('/app/healthcheck') and time.time() - os.path.getmtime('/app/healthcheck') < 3600, 'Healthcheck failed'"] interval: 5m @@ -54,6 +61,7 @@ services: retries: 3 start_period: 1m + # Prevent log files from growing indefinitely and consuming disk space logging: driver: "json-file" options: diff --git a/pyproject.toml b/pyproject.toml index 7ae4b5b..2ea9859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,13 @@ target-version = "py39" fix = true fix-only = false +[tool.ruff.format] +# Format SQL code in strings/docstrings +docstring-code-format = false +quote-style = "double" +indent-style = "space" +line-ending = "lf" + [tool.ruff.lint] # Enable rules including isort (I) for import sorting and additional fixes select = ["E", "W", "F", "I"] @@ -34,7 +41,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" [tool.ruff.lint.per-file-ignores] # Ignore E402 (import not at top) in scripts and specific modules "scripts/test_*.py" = ["E402"] -"src/models/issuance_eligibility_oracle_core.py" = ["E402"] +"src/models/service_quality_oracle.py" = ["E402"] # Use unsafe fixes to address typing and other modernization issues [tool.ruff.lint.isort] @@ -44,12 +51,6 @@ known-first-party = ["src"] # Unlike Flake8, default to a complexity level of 10. max-complexity = 10 -[tool.ruff.format] -# Format SQL code in strings/docstrings -docstring-code-format = true -quote-style = "double" -indent-style = "space" - [tool.mypy] ignore_missing_imports = true no_strict_optional = true diff --git a/scripts/custom_formatter.py b/scripts/custom_formatter.py new file mode 100644 index 0000000..f8dcb5a --- /dev/null +++ b/scripts/custom_formatter.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import argparse +import ast +import sys +from pathlib import Path + + +class PythonFormatter: + + def __init__(self, source_code: str): + self.source_lines = source_code.splitlines() + self.tree = ast.parse(source_code) + self.node_parents = { + child: parent for parent in ast.walk(self.tree) for child in ast.iter_child_nodes(parent) + } + self.disabled_ranges = self._find_disabled_ranges() + + + def _find_disabled_ranges(self): + ranges = [] + in_disabled_block = False + start_line = 0 + for i, line in enumerate(self.source_lines): + if "# fmt: off" in line: + in_disabled_block = True + start_line = i + 1 + elif "# fmt: on" in line: + if in_disabled_block: + ranges.append((start_line, i + 1)) + in_disabled_block = False + return ranges + + + def _is_in_disabled_range(self, lineno): + for start, end in self.disabled_ranges: + if start <= lineno <= end: + return True + return False + + + def get_node_start_line(self, node): + if node.decorator_list: + return node.decorator_list[0].lineno + return node.lineno + + + def is_method(self, node) -> bool: + return isinstance(self.node_parents.get(node), ast.ClassDef) + + + def format(self) -> str: + nodes = {} + for node in ast.walk(self.tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + start_line = self.get_node_start_line(node) + nodes[start_line] = node + + lines = list(self.source_lines) + sorted_nodes = sorted(nodes.items(), key=lambda x: x[0], reverse=True) + + for lineno, node in sorted_nodes: + start_index = lineno - 1 + num_blank_lines = 0 + + # Skip formatting if node is inside a "fmt: off" block + if self._is_in_disabled_range(lineno): + continue + + if isinstance(node, ast.ClassDef): + num_blank_lines = 2 + elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if self.is_method(node): + if node.name == "__init__": + num_blank_lines = 1 + else: + num_blank_lines = 2 + else: + num_blank_lines = 2 + + i = start_index - 1 + while i > 0 and not lines[i].strip(): + i -= 1 + + if i < 0: # start of file + i = -1 # will insert at 0 + + # For top-level nodes, we don't want to add spaces if it's the first thing in the file + # after imports. Let's check if there's anything but imports above. + is_truly_top_level = i == -1 + if not is_truly_top_level: + # Count existing blank lines + existing_blank_lines = 0 + for k in range(start_index - 1, i, -1): + if not lines[k].strip(): + existing_blank_lines += 1 + + # Only add lines if there are not enough + if existing_blank_lines < num_blank_lines: + # remove existing blank lines + del lines[i + 1 : start_index] + # insert new blank lines + for _ in range(num_blank_lines): + lines.insert(i + 1, "") + + result = "\n".join(line.rstrip() for line in lines) + if result: + result = result.strip() + "\n" + + return result + + +def main(): + parser = argparse.ArgumentParser(description="Python custom formatter.") + parser.add_argument("files", nargs="+", type=Path) + args = parser.parse_args() + + for path in args.files: + try: + source = path.read_text() + # Skip empty files + if not source.strip(): + continue + formatter = PythonFormatter(source) + formatted_source = formatter.format() + path.write_text(formatted_source) + print(f"Formatted {path}") + except Exception as e: + print(f"Could not format {path}: {e}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/scripts/ruff_check_format_assets.sh b/scripts/ruff_check_format_assets.sh index c141eeb..0962148 100755 --- a/scripts/ruff_check_format_assets.sh +++ b/scripts/ruff_check_format_assets.sh @@ -8,24 +8,23 @@ if [ ! -f "requirements.txt" ]; then exit 1 fi +# Check if pyproject.toml exists with ruff configuration +if [ ! -f "pyproject.toml" ]; then + echo "Error: pyproject.toml not found. Make sure it exists with proper ruff configuration" + exit 1 +fi + # Run ruff check with auto-fix first (including unsafe fixes for typing annotations) echo "Running ruff check with auto-fix..." ruff check src tests scripts --fix --unsafe-fixes --show-fixes -# Run ruff format +# Run ruff format with respect to project configuration echo "Running ruff format..." ruff format src tests scripts -# Fix SQL-specific whitespace issues after ruff (only trailing whitespace, avoid blank line removal) -echo "Fixing SQL trailing whitespace issues in BigQuery provider..." -if [[ "$OSTYPE" == "darwin"* ]]; then - # macOS - Only fix trailing whitespace after SQL keywords - find src/models -name "*.py" -type f -exec sed -i '' -E 's/([A-Z]+) +$/\1/g' {} \; -else - # Linux (CI environment) - Only fix trailing whitespace after SQL keywords - find src/models -name "*.py" -type f -exec sed -i -E 's/([A-Z]+) +$/\1/g' {} \; -fi -echo "SQL whitespace issues fixed!" +# Post-process files to ensure custom spacing rules are applied +echo "Applying custom spacing rules with custom formatter..." +find src tests scripts -name "*.py" -print0 | xargs -0 python3 scripts/custom_formatter.py # Show remaining issues (mainly line length issues that need manual intervention) echo -e "\n\nRemaining issues that need manual attention:" diff --git a/src/models/bigquery_data_access_provider.py b/src/models/bigquery_data_access_provider.py index 8fd68e9..390950c 100644 --- a/src/models/bigquery_data_access_provider.py +++ b/src/models/bigquery_data_access_provider.py @@ -3,19 +3,14 @@ """ import logging -import os import socket from datetime import date from typing import cast from bigframes import pandas as bpd from pandera.typing import DataFrame -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) + +from src.utils.retry_decorator import retry_with_backoff # Module-level logger logger = logging.getLogger(__name__) @@ -24,41 +19,43 @@ class BigQueryProvider: """A class that provides read access to Google BigQuery for indexer data.""" - def __init__(self, project: str, location: str) -> None: + def __init__( + self, + project: str, + location: str, + table_name: str, + min_online_days: int, + min_subgraphs: int, + max_latency_ms: int, + max_blocks_behind: int, + ) -> None: # Configure BigQuery connection globally for all SQL queries to BigQuery bpd.options.bigquery.location = location bpd.options.bigquery.project = project bpd.options.display.progress_bar = None + self.table_name = table_name + self.min_online_days = min_online_days + self.min_subgraphs = min_subgraphs + self.max_latency_ms = max_latency_ms + self.max_blocks_behind = max_blocks_behind + - @retry( - retry=retry_if_exception_type((ConnectionError, socket.timeout)), - stop=stop_after_attempt(10), - wait=wait_exponential(multiplier=1, max=60), - reraise=True, - ) + @retry_with_backoff(max_attempts=10, min_wait=1, max_wait=60, exceptions=(ConnectionError, socket.timeout)) def _read_gbq_dataframe(self, query: str) -> DataFrame: """ Execute a read query on Google BigQuery and return the results as a pandas DataFrame. - Retries up to stop_after_attempt times on connection errors with exponential backoff. + Retries up to max_attempts times on connection errors with exponential backoff. + Note: This method uses the bigframes.pandas.read_gbq function to execute the query. It relies on Application Default Credentials (ADC) for authentication, primarily using the GOOGLE_APPLICATION_CREDENTIALS environment variable if set. This variable should point to the JSON file containing the service account key. """ - # Check if GOOGLE_APPLICATION_CREDENTIALS is set and valid - creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") - if creds_path: - if not os.path.exists(os.path.expanduser(creds_path)): - logger.warning(f"GOOGLE_APPLICATION_CREDENTIALS path not found: {creds_path}") - logger.warning("Falling back to gcloud CLI user credentials.") - else: - logger.info("Using enviroment variable $GOOGLE_APPLICATION_CREDENTIALS for authentication.") - else: - logger.warning("GOOGLE_APPLICATION_CREDENTIALS not set, falling back to gcloud CLI user credentials") # Execute the query with retry logic return cast(DataFrame, bpd.read_gbq(query).to_pandas()) + def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> str: """ Construct an SQL query that calculates indexer eligibility: @@ -70,9 +67,11 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st - Blocks behind <50,000, - Subgraph has >=500 GRT signal at query time Note: The 500 GRT curation signal requirement is not currently implemented. + Args: start_date (date): The start date for the data range. end_date (date): The end date for the data range. + Returns: str: SQL query string for indexer eligibility data. """ @@ -88,14 +87,14 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st COUNT(*) AS query_attempts, SUM(CASE WHEN status = '200 OK' - AND response_time_ms < 5000 - AND blocks_behind < 50000 + AND response_time_ms < {self.max_latency_ms} + AND blocks_behind < {self.max_blocks_behind} THEN 1 ELSE 0 END) AS good_responses, COUNT(DISTINCT deployment) AS unique_subgraphs_served FROM - internal_metrics.metrics_indexer_attempts + {self.table_name} WHERE day_partition BETWEEN '{start_date_str}' AND '{end_date_str}' GROUP BY @@ -107,7 +106,7 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st indexer, day, unique_subgraphs_served, - CASE WHEN good_responses >= 1 AND unique_subgraphs_served >= 10 + CASE WHEN good_responses >= 1 AND unique_subgraphs_served >= {self.min_subgraphs} THEN 1 ELSE 0 END AS is_online_day FROM @@ -119,12 +118,12 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st indexer, COUNT(DISTINCT deployment) AS unique_good_response_subgraphs FROM - internal_metrics.metrics_indexer_attempts + {self.table_name} WHERE day_partition BETWEEN '{start_date_str}' AND '{end_date_str}' AND status = '200 OK' - AND response_time_ms < 5000 - AND blocks_behind < 50000 + AND response_time_ms < {self.max_latency_ms} + AND blocks_behind < {self.max_blocks_behind} GROUP BY indexer ), @@ -153,7 +152,7 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st total_good_days_online, unique_good_response_subgraphs, CASE - WHEN total_good_days_online >= 5 THEN 1 + WHEN total_good_days_online >= {self.min_online_days} THEN 1 ELSE 0 END AS eligible_for_indexing_rewards FROM @@ -162,16 +161,20 @@ def _get_indexer_eligibility_query(self, start_date: date, end_date: date) -> st total_good_days_online DESC, good_responses DESC """ + def fetch_indexer_issuance_eligibility_data(self, start_date: date, end_date: date) -> DataFrame: """ Fetch data from Google BigQuery, used to determine indexer issuance eligibility, and compute each indexer's issuance eligibility status. + Depends on: - _get_indexer_eligibility_query() - _read_gbq_dataframe() + Args: start_date (date): The start date for the data to fetch from BigQuery. end_date (date): The end date for the data to fetch from BigQuery. + Returns: DataFrame: DataFrame containing a range of metrics for each indexer. The DataFrame contains the following columns: diff --git a/src/models/blockchain_client.py b/src/models/blockchain_client.py new file mode 100644 index 0000000..8b0e435 --- /dev/null +++ b/src/models/blockchain_client.py @@ -0,0 +1,638 @@ +""" +Blockchain client for Service Quality Oracle. + +This module handles all blockchain interactions including: +- Contract ABI loading +- RPC provider connections with failover +- Transaction building, signing, and sending +- Gas estimation and nonce management +""" + +import json +import logging +from pathlib import Path +from typing import Any, Callable, Dict, List, Tuple, cast + +from web3 import Web3 +from web3.contract import Contract +from web3.providers import HTTPProvider +from web3.types import BlockData, ChecksumAddress, HexStr + +from src.utils.key_validator import KeyValidationError, validate_and_format_private_key +from src.utils.retry_decorator import retry_with_backoff + +logger = logging.getLogger(__name__) + + +class BlockchainClient: + """Handles all blockchain interactions""" + + def __init__( + self, + rpc_providers: List[str], + contract_address: str, + project_root: Path, + block_explorer_url: str, + tx_timeout_seconds: int, + ): + """ + Initialize the blockchain client. + + Args: + rpc_providers: List of RPC provider URLs + contract_address: Smart contract address + project_root: Path to project root directory + block_explorer_url: Base URL for the block explorer (e.g., https://sepolia.arbiscan.io) + tx_timeout_seconds: Seconds to wait for a transaction receipt. + """ + self.rpc_providers = rpc_providers + self.contract_address = contract_address + self.project_root = project_root + self.block_explorer_url = block_explorer_url.rstrip("/") + self.tx_timeout_seconds = tx_timeout_seconds + self.contract_abi = self._load_contract_abi() + + + def _load_contract_abi(self) -> List[Dict]: + """Load the contract ABI from the contracts directory.""" + # Try to load the ABI file + try: + abi_path = self.project_root / "contracts" / "contract.abi.json" + with open(abi_path) as f: + return json.load(f) + + # If the ABI file cannot be loaded, raise an error + except Exception as e: + logger.error(f"Failed to load contract ABI: {str(e)}") + raise + + + @retry_with_backoff(max_attempts=3, exceptions=(ConnectionError,)) + def _get_working_web3_connection( + self, rpc_providers: List[str], contract_address: str, contract_abi: List[Dict] + ) -> Tuple[Web3, Contract, str]: + """ + Try connecting to RPC providers until one works. + + Args: + rpc_providers: List of RPC provider URLs to try connecting to + contract_address: Contract address for creating contract instance + contract_abi: Contract ABI for creating contract instance + + Returns: + Tuple[Web3, Contract, str]: Working web3 instance, contract instance, and provider URL + + Raises: + ConnectionError: If all RPC providers fail + """ + # Try to connect to each RPC provider in sequence + for i, rpc_url in enumerate(rpc_providers): + try: + provider_type = "primary" if i == 0 else f"backup #{i}" + logger.info(f"Attempting to connect to {provider_type} RPC provider: {rpc_url}") + w3 = Web3(Web3.HTTPProvider(rpc_url)) + + # Test connection + if w3.is_connected(): + logger.info(f"Successfully connected to {provider_type} RPC provider") + # Create contract instance and return web3 instance, contract instance, and provider URL + contract = w3.eth.contract( + address=Web3.to_checksum_address(contract_address), abi=contract_abi + ) + + # + return w3, contract, rpc_url + + # If we could not connect log the error + else: + logger.warning(f"Could not connect to {provider_type} RPC provider: {rpc_url}") + + # If we get an error, log the error + except Exception as e: + provider_type = "primary" if i == 0 else f"backup #{i}" + logger.warning(f"Error connecting to {provider_type} RPC provider {rpc_url}: {str(e)}") + + # If we get here, all providers failed + raise ConnectionError(f"Failed to connect to any of {len(rpc_providers)} RPC providers: {rpc_providers}") + + + def _setup_transaction_account(self, private_key: str) -> Tuple[str, str]: + """ + Validate the private key and return the formatted key and account address. + + Args: + private_key: The private key string. + + Returns: + A tuple containing the account address and the formatted private key. + + Raises: + KeyValidationError: If the private key is invalid. + """ + try: + formatted_key = validate_and_format_private_key(private_key) + account = Web3().eth.account.from_key(formatted_key) + logger.info(f"Using account: {account.address}") + return account.address, formatted_key + + except KeyValidationError as e: + logger.error(f"Invalid private key provided: {e}") + raise + + except Exception as e: + logger.error(f"Failed to retrieve account from private key: {str(e)}") + raise + + + def _estimate_transaction_gas( + self, + w3: Web3, + contract_func: Any, + indexer_addresses: List[str], + data_bytes: bytes, + sender_address: ChecksumAddress, + ) -> int: + """ + Estimate gas for the transaction with 25% buffer. + + Args: + w3: Web3 instance + contract_func: Contract function to call + indexer_addresses: List of indexer addresses + data_bytes: Data bytes for the transaction + sender_address: Transaction sender address + + Returns: + int: Estimated gas with 25% buffer + """ + # Try to estimate the gas for the transaction + try: + estimated_gas = contract_func(indexer_addresses, data_bytes).estimate_gas({"from": sender_address}) + gas_limit = int(estimated_gas * 1.25) # 25% buffer + logger.info(f"Estimated gas: {estimated_gas}, with buffer: {gas_limit}") + return gas_limit + + # If the gas estimation fails, log the error and raise an exception + except Exception as e: + logger.error(f"Gas estimation failed: {str(e)}") + raise + + + def _determine_transaction_nonce(self, w3: Web3, sender_address: ChecksumAddress, replace: bool) -> int: + """ + Determine the appropriate nonce for the transaction. + + Args: + w3: Web3 instance + sender_address: Transaction sender address + replace: Whether to replace pending transactions + + Returns: + int: Transaction nonce to use + """ + # If we are not replacing a pending transaction, use the next available nonce + if not replace: + nonce = w3.eth.get_transaction_count(sender_address) + logger.info(f"Using next available nonce: {nonce}") + return nonce + + # If we are replacing a pending transaction, try to find and replace it + logger.info("Attempting to find and replace a pending transaction") + + # Try to find pending transactions + try: + pending_txs_data = w3.eth.get_block("pending", full_transactions=True) + pending_txs = cast(BlockData, pending_txs_data) + sender_pending_txs = [ + tx + for tx in pending_txs["transactions"] + if isinstance(tx, dict) and tx.get("from") == sender_address + ] + + # If we found pending transactions, use the nonce of the first pending transaction + if sender_pending_txs: + sender_pending_txs.sort(key=lambda x: x["nonce"]) + nonce = sender_pending_txs[0]["nonce"] + logger.info(f"Found pending transaction with nonce {nonce} for replacement") + return nonce + + # If we could not find pending transactions log the issue + except Exception as e: + logger.warning(f"Could not check pending transactions: {str(e)}") + + # Check for nonce gaps + try: + current_nonce = w3.eth.get_transaction_count(sender_address, "pending") + latest_nonce = w3.eth.get_transaction_count(sender_address, "latest") + if current_nonce > latest_nonce: + logger.info(f"Detected nonce gap: latest={latest_nonce}, pending={current_nonce}") + return latest_nonce + + # If we could not check nonce gaps log the issue + except Exception as e: + logger.warning(f"Could not check nonce gap: {str(e)}") + + # Fallback to next available nonce + nonce = w3.eth.get_transaction_count(sender_address) + logger.info(f"Using next available nonce: {nonce}") + return nonce + + + def _get_gas_prices(self, w3: Web3, replace: bool) -> Tuple[int, int]: + """Get base fee and max priority fee for transaction.""" + # Get current gas prices with detailed logging + try: + latest_block_data = w3.eth.get_block("latest") + latest_block = cast(BlockData, latest_block_data) + base_fee_hex = latest_block["baseFeePerGas"] + base_fee = int(base_fee_hex) + logger.info(f"Latest block base fee: {base_fee/1e9:.2f} gwei") + + # If the base fee cannot be retrieved, use a fallback value + except Exception as e: + logger.warning(f"Could not get base fee: {e}") + base_fee = w3.to_wei(10, "gwei") + + # Try to get the max priority fee + try: + max_priority_fee = w3.eth.max_priority_fee + logger.info(f"Max priority fee: {max_priority_fee/1e9:.2f} gwei") + + # If the max priority fee cannot be retrieved, use a fallback value + except Exception as e: + logger.warning(f"Could not get max priority fee: {e}") + max_priority_fee = w3.to_wei(2, "gwei") # fallback + + # Return the base fee and max priority fee + return base_fee, max_priority_fee + + + def _build_transaction_params( + self, + sender_address: ChecksumAddress, + nonce: int, + chain_id: int, + gas_limit: int, + base_fee: int, + max_priority_fee: int, + replace: bool, + ) -> Dict: + """Build transaction parameters with appropriate gas prices.""" + tx_params = {"from": sender_address, "nonce": nonce, "chainId": chain_id, "gas": gas_limit} + + # Set gas prices (higher for replacement transactions) + if replace: + max_fee_per_gas = base_fee * 4 + max_priority_fee * 2 + max_priority_fee_per_gas = max_priority_fee * 2 + tx_params["maxFeePerGas"] = max_fee_per_gas + tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas + logger.info(f"High gas for replacement: {max_fee_per_gas/1e9:.2f} gwei") + + # If we are not replacing a pending transaction, use a lower gas price + else: + max_fee_per_gas = base_fee * 2 + max_priority_fee + max_priority_fee_per_gas = max_priority_fee + tx_params["maxFeePerGas"] = max_fee_per_gas + tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas + logger.info(f"Standard gas: {max_fee_per_gas/1e9:.2f} gwei") + + logger.info(f"Transaction parameters: nonce={nonce}, gas={gas_limit}, chain_id={chain_id}") + return tx_params + + + def _build_and_sign_transaction( + self, + w3: Web3, + contract_func: Any, + indexer_addresses: List[str], + data_bytes: bytes, + tx_params: Dict, + private_key: str, + ): + """Build and sign the transaction.""" + # Attempt to build the transaction + try: + transaction = contract_func(indexer_addresses, data_bytes).build_transaction(tx_params) + logger.info("Transaction built successfully") + + # If the transaction cannot be built, log the error and raise an exception + except Exception as e: + logger.error(f"Failed to build transaction: {e}") + logger.error(f"Contract function: {contract_func}") + logger.error(f"Indexer addresses count: {len(indexer_addresses)}") + logger.error(f"Data bytes length: {len(data_bytes)}") + logger.error(f"Transaction params: {tx_params}") + raise + + # Attempt to sign the transaction + try: + signed_tx = w3.eth.account.sign_transaction(transaction, private_key) + logger.info("Transaction signed successfully") + return signed_tx + + # If the transaction cannot be signed, log the error and raise an exception + except Exception as e: + logger.error(f"Failed to sign transaction: {e}") + raise + + + def _handle_transaction_error(self, error_msg: str) -> None: + """Handle and log specific transaction error types.""" + # If the error message contains "insufficient funds", log the error + if "insufficient funds" in error_msg.lower(): + logger.error("Insufficient funds to pay for gas") + + # If the error message contains "nonce too low", log the error + elif "nonce too low" in error_msg.lower(): + logger.error("Nonce is too low - transaction may have already been sent") + + # If the error message contains "nonce too high", log the error + elif "nonce too high" in error_msg.lower(): + logger.error("Nonce is too high - there may be pending transactions") + + # If the error message contains "gas", log the error + elif "gas" in error_msg.lower(): + logger.error("Gas-related issue - transaction may consume too much gas") + + # If the error message contains "400", log the error + elif "400" in error_msg: + logger.error("HTTP 400 Bad Request - RPC provider rejected the request") + + + def _send_signed_transaction(self, w3: Web3, signed_tx: Any) -> str: + """Send the signed transaction and handle errors.""" + # Attempt to send the transaction to the network + try: + tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) + logger.info(f"Transaction sent! Hash: {tx_hash.hex()}") + return tx_hash.hex() + + # If the transaction could not be sent, log the error and raise an exception + except ValueError as e: + error_msg = str(e) + logger.error(f"Transaction rejected by network: {error_msg}") + self._handle_transaction_error(error_msg) + raise + + # If we get an unexpected error, log the error and raise an exception + except Exception as e: + logger.error(f"Unexpected error sending transaction: {e}") + logger.error(f"Error type: {type(e).__name__}") + raise + + + def _execute_complete_transaction(self, w3: Web3, contract: Contract, params: Dict) -> str: + """ + Execute the complete transaction process using a single RPC connection. + + Args: + w3: Web3 instance + contract: Contract instance + params: Dictionary containing all transaction parameters + + Returns: + str: Transaction hash + """ + # Extract parameters + private_key = params["private_key"] + contract_function = params["contract_function"] + indexer_addresses = params["indexer_addresses"] + data_bytes = params["data_bytes"] + sender_address_str = params["sender_address"] + chain_id = params["chain_id"] + replace = params["replace"] + sender_address = Web3.to_checksum_address(sender_address_str) + + # Validate contract function exists + if not hasattr(contract.functions, contract_function): + raise ValueError(f"Contract {contract.address} does not have function: {contract_function}") + + contract_func = getattr(contract.functions, contract_function) + + # Log transaction details + logger.info(f"Contract address: {contract.address}") + logger.info(f"Contract function: {contract_function}") + logger.info(f"Number of indexers: {len(indexer_addresses)}") + logger.info(f"Data bytes length: {len(data_bytes)}") + logger.info(f"Chain ID: {chain_id}") + logger.info(f"Sender address: {sender_address}") + if isinstance(w3.provider, HTTPProvider): + logger.info(f"Using RPC: {w3.provider.endpoint_uri}") + + # Check account balance + balance_wei = w3.eth.get_balance(sender_address) + balance_eth = w3.from_wei(balance_wei, "ether") + logger.info(f"Account balance: {balance_eth} ETH") + + try: + # 1. Estimate gas + gas_limit = self._estimate_transaction_gas( + w3, contract_func, indexer_addresses, data_bytes, sender_address + ) + + # 2. Determine nonce + nonce = self._determine_transaction_nonce(w3, sender_address, replace) + + # 3. Get gas prices + base_fee, max_priority_fee = self._get_gas_prices(w3, replace) + + # 4. Build transaction parameters + tx_params = self._build_transaction_params( + sender_address, nonce, chain_id, gas_limit, base_fee, max_priority_fee, replace + ) + + # 5. Build and sign transaction + signed_tx = self._build_and_sign_transaction( + w3, contract_func, indexer_addresses, data_bytes, tx_params, private_key + ) + + # 6. Send transaction + tx_hash = self._send_signed_transaction(w3, signed_tx) + + except Exception as e: + logger.error(f"Transaction execution failed: {e}", exc_info=True) + raise + + # Wait for receipt with the same connection + try: + tx_receipt = w3.eth.wait_for_transaction_receipt(HexStr(tx_hash), timeout=self.tx_timeout_seconds) + if tx_receipt["status"] == 1: + logger.info( + f"Transaction confirmed in block {tx_receipt['blockNumber']}, " + f"gas used: {tx_receipt['gasUsed']}" + ) + else: + logger.error(f"Transaction failed on-chain: {tx_hash}") + except Exception as e: + logger.warning(f"Could not get transaction receipt: {str(e)} (transaction may still be pending)") + + return tx_hash + + + def _execute_transaction_with_rpc_failover( + self, operation_name: str, operation_func: Callable, operation_params: Dict + ) -> Any: + """ + Execute a transaction operation with automatic RPC failover. + This function tries each RPC provider in sequence until one succeeds. + + Args: + operation_name: Human-readable name for the transaction operation + operation_func: Function that takes (w3, contract, operation_params) and executes the operation + operation_params: Parameters for the operation + + Returns: + Result of the operation_func + + Raises: + Exception: If all RPC providers fail + """ + # Initialize last_exception to None + last_exception = None + + # Try each RPC provider in sequence + for rpc_url in self.rpc_providers: + try: + # Log the attempt + logger.info(f"Attempting to do '{operation_name}' using RPC provider: {rpc_url}") + + # Get fresh connection for this rpc provider attempt + w3, contract, _ = self._get_working_web3_connection( + [rpc_url], self.contract_address, self.contract_abi + ) + + # Execute the operation with this rpc provider and return the result + return operation_func(w3, contract, operation_params) + + # If the operation fails, log the error and continue to the next rpc provider + except Exception as e: + logger.warning(f"{operation_name} failed with RPC provider {rpc_url}: {str(e)}") + last_exception = e + + # If we get here, all providers failed + logger.error(f"{operation_name} failed on all {len(self.rpc_providers)} RPC providers") + raise last_exception or Exception(f"All RPC providers failed for {operation_name}") + + + def send_transaction_to_allow_indexers( + self, + indexer_addresses: List[str], + private_key: str, + chain_id: int, + contract_function: str, + replace: bool = False, + data_bytes: bytes = b"", + ) -> str: + """ + Send a transaction to allow a subset of indexers to claim issuance rewards. + + Args: + indexer_addresses: List of indexer addresses to allow issuance + private_key: Private key for transaction signing + chain_id: Chain ID of the target blockchain + contract_function: Contract function name to call + replace: Flag to replace pending transactions + data_bytes: Optional bytes data to pass to contract function + + Returns: + str: Transaction hash + """ + # Set up account and validate private key + sender_address, formatted_private_key = self._setup_transaction_account(private_key) + + # Convert addresses to checksum format + checksum_addresses = [Web3.to_checksum_address(addr) for addr in indexer_addresses] + + # Prepare all parameters for the transaction + transaction_params = { + "private_key": formatted_private_key, + "contract_function": contract_function, + "indexer_addresses": checksum_addresses, + "data_bytes": data_bytes, + "sender_address": sender_address, + "chain_id": chain_id, + "replace": replace, + } + + # Execute the transaction with RPC failover + try: + return self._execute_transaction_with_rpc_failover( + "Allow indexers to claim issuance", + self._execute_complete_transaction, + transaction_params, + ) + except Exception as e: + logger.error(f"Transaction failed on all RPC providers: {str(e)}") + raise + + + def batch_allow_indexers_issuance_eligibility( + self, + indexer_addresses: List[str], + private_key: str, + chain_id: int, + contract_function: str, + batch_size: int, + replace: bool = False, + data_bytes: bytes = b"", + ) -> List[str]: + """ + Allow the issuance eligibility status of a list of indexers in batches. + + Args: + indexer_addresses: List of indexer addresses to allow + private_key: Private key for transaction signing + chain_id: Chain ID of the target blockchain + contract_function: Contract function name to call + batch_size: Optional batch size for processing large lists + replace: Optional flag to replace pending transactions + data_bytes: Optional bytes data to pass to contract function + + Returns: + List[str]: List of transaction hashes from successful batches + """ + # Validate function parameters + if not indexer_addresses: + logger.warning("No indexers provided to allow. Returning empty list.") + return [] + if batch_size <= 0: + raise ValueError("batch_size must be positive") + + # Calculate number of batches to process + total_indexers = len(indexer_addresses) + num_batches = (total_indexers + batch_size - 1) // batch_size + logger.info(f"Processing {total_indexers} indexers in {num_batches} batch(es) of {batch_size}") + + tx_links = [] + + # Process each batch + for i in range(num_batches): + start_idx = i * batch_size + end_idx = min(start_idx + batch_size, total_indexers) + batch_indexers = indexer_addresses[start_idx:end_idx] + + logger.info(f"Processing batch {i+1}/{num_batches} with {len(batch_indexers)} indexers") + + # Try to send the transaction to the network (uses RPC failover) + try: + tx_hash = self.send_transaction_to_allow_indexers( + batch_indexers, + private_key, + chain_id, + contract_function, + replace, + data_bytes, + ) + tx_links.append(f"{self.block_explorer_url}/tx/{tx_hash}") + logger.info(f"Batch {i+1} transaction successful: {tx_hash}") + + # If we get an error, log the error and raise an exception + except Exception as e: + logger.error(f"Error processing batch {i+1} due to: {e}") + raise + + # Log all transaction links + for i, tx_link in enumerate(tx_links, 1): + logger.info(f"Transaction link {i} of {len(tx_links)}: {tx_link}") + + return tx_links diff --git a/src/models/eligibility_pipeline.py b/src/models/eligibility_pipeline.py new file mode 100644 index 0000000..4aa87b1 --- /dev/null +++ b/src/models/eligibility_pipeline.py @@ -0,0 +1,216 @@ +""" +Eligibility pipeline module for the Service Quality Oracle. + +This module contains the logic for processing raw BigQuery data into a list of eligible indexers. It handles: +- Parsing and filtering of indexer performance data. +- Generation of CSV files for record-keeping. +- Cleanup of old data. +""" + +import logging +import shutil +from datetime import date, datetime +from pathlib import Path +from typing import List, Tuple + +import pandas as pd + +logger = logging.getLogger(__name__) + + +class EligibilityPipeline: + """Handles the data processing pipeline and file management operations.""" + + def __init__(self, project_root: Path): + """ + Initialize the eligibility pipeline. + + Args: + project_root: Path to project root directory + """ + # Set the project root and output directory + self.project_root = project_root + self.output_dir = project_root / "data" / "output" + + + def process(self, input_data_from_bigquery: pd.DataFrame, current_date: date) -> Tuple[List[str], List[str]]: + """ + Process raw BigQuery data to generate data and return eligible indexer lists. + + Args: + input_data_from_bigquery: DataFrame from BigQuery. + current_date: The date of the current run, used for creating the output directory. + + Returns: + Tuple[List[str], List[str]]: Two lists of indexer addresses, eligible and ineligible + """ + # 1. Validate the structure of the input data + required_cols = ["indexer", "eligible_for_indexing_rewards"] + self.validate_dataframe_structure(input_data_from_bigquery, required_cols) + + # 2. Filter data into eligible and ineligible groups + eligible_df = input_data_from_bigquery[ + input_data_from_bigquery["eligible_for_indexing_rewards"] == 1 + ].copy() + + ineligible_df = input_data_from_bigquery[ + input_data_from_bigquery["eligible_for_indexing_rewards"] == 0 + ].copy() + + # 3. Generate and save files + output_date_dir = self.get_date_output_directory(current_date) + self._generate_files(input_data_from_bigquery, eligible_df, ineligible_df, output_date_dir) + + # 4. Return the lists of indexers + return eligible_df["indexer"].tolist(), ineligible_df["indexer"].tolist() + + + def _generate_files( + self, raw_data: pd.DataFrame, eligible_df: pd.DataFrame, ineligible_df: pd.DataFrame, output_date_dir: Path + ) -> None: + """ + Save the raw and filtered dataframes to CSV files in a date-specific directory. + - indexer_issuance_eligibility_data.csv (raw data) + - eligible_indexers.csv (only eligible indexer addresses) + - ineligible_indexers.csv (only ineligible indexer addresses) + + Args: + raw_data: The input DataFrame containing all indexer data. + eligible_df: DataFrame containing only eligible indexers. + ineligible_df: DataFrame containing only ineligible indexers. + output_date_dir: The directory where files will be saved. + """ + # Ensure the output directory exists, creating parent directories if necessary + output_date_dir.mkdir(exist_ok=True, parents=True) + + # Save raw data for internal use + raw_data_path = output_date_dir / "indexer_issuance_eligibility_data.csv" + raw_data.to_csv(raw_data_path, index=False) + logger.info(f"Saved raw BigQuery results to: {raw_data_path}") + + # Save filtered data + eligible_path = output_date_dir / "eligible_indexers.csv" + ineligible_path = output_date_dir / "ineligible_indexers.csv" + + eligible_df[["indexer"]].to_csv(eligible_path, index=False) + ineligible_df[["indexer"]].to_csv(ineligible_path, index=False) + + logger.info(f"Saved {len(eligible_df)} eligible indexers to: {eligible_path}") + logger.info(f"Saved {len(ineligible_df)} ineligible indexers to: {ineligible_path}") + + + def clean_old_date_directories(self, max_age_before_deletion: int) -> None: + """ + Remove old date directories to prevent unlimited growth. + + Args: + max_age_before_deletion: Maximum age in days before deleting data output + """ + today = date.today() + + # Check if the output directory exists + if not self.output_dir.exists(): + logger.warning(f"Output directory does not exist: {self.output_dir}") + return + + directories_removed = 0 + + # Only process directories with date format YYYY-MM-DD + for item in self.output_dir.iterdir(): + if not item.is_dir(): + continue + + try: + # Try to parse the directory name as a date + dir_date = datetime.strptime(item.name, "%Y-%m-%d").date() + age_days = (today - dir_date).days + + # Remove if older than max_age_before_deletion + if age_days > max_age_before_deletion: + logger.info(f"Removing old data directory: {item} ({age_days} days old)") + shutil.rmtree(item) + directories_removed += 1 + + except ValueError: + # Skip directories that don't match date format + logger.debug(f"Skipping non-date directory: {item.name}") + continue + + if directories_removed > 0: + logger.info(f"Removed {directories_removed} old data directories") + else: + logger.info("No old data directories found to remove") + + + def get_date_output_directory(self, current_date: date) -> Path: + """ + Get the output directory path for a specific date. + + Args: + current_date: Date for which to get the output directory + + Returns: + Path: Path to the date-specific output directory + """ + return self.output_dir / current_date.strftime("%Y-%m-%d") + + + def validate_dataframe_structure(self, df: pd.DataFrame, required_columns: List[str]) -> bool: + """ + Validate that a DataFrame has the required columns. + + Args: + df: DataFrame to validate + required_columns: List of required column names + + Returns: + bool: True if all required columns are present + + Raises: + ValueError: If required columns are missing + """ + # Check if any required columns are missing + missing_columns = [col for col in required_columns if col not in df.columns] + + # If any required columns are missing, raise an error + if missing_columns: + raise ValueError( + f"DataFrame missing required columns: {missing_columns}. " f"Found columns: {list(df.columns)}" + ) + + # If all required columns are present, return True + return True + + + def get_directory_size_info(self) -> dict: + """ + Get information about the output directory size and file counts. + + Returns: + dict: Information about directory size and contents + """ + # If the directory doesn't exist, return a dictionary with 0 values + if not self.output_dir.exists(): + return {"exists": False, "total_size_bytes": 0, "directory_count": 0, "file_count": 0} + + total_size = 0 + file_count = 0 + directory_count = 0 + + # Get the total size of the directory and the number of files and directories + for item in self.output_dir.rglob("*"): + if item.is_file(): + total_size += item.stat().st_size + file_count += 1 + elif item.is_dir(): + directory_count += 1 + + # Return the information about the directory size and contents + return { + "exists": True, + "total_size_bytes": total_size, + "total_size_mb": round(total_size / (1024 * 1024), 2), + "directory_count": directory_count, + "file_count": file_count, + "path": str(self.output_dir), + } diff --git a/src/models/issuance_data_access_helper.py b/src/models/issuance_data_access_helper.py deleted file mode 100644 index ca659ba..0000000 --- a/src/models/issuance_data_access_helper.py +++ /dev/null @@ -1,937 +0,0 @@ -""" -Helper module containing utility functions related to data access and processing -for the Service Quality Oracle. -""" - -import json -import logging -import os -import shutil -from datetime import date, datetime -from pathlib import Path -from typing import Any - -import pandas as pd -from tenacity import retry, stop_after_attempt, wait_exponential -from web3 import Web3 -from web3.contract import Contract - -# Import data providers -from src.models.bigquery_data_access_provider import BigQueryProvider -from src.models.subgraph_data_access_provider import SubgraphProvider - -# Import configuration and key validation -from src.utils.config_loader import ConfigLoader, ConfigurationError -from src.utils.key_validator import KeyValidationError, validate_and_format_private_key - -logger = logging.getLogger(__name__) - - -# ============================================================================= -# CONFIGURATION AND SETUP FUNCTIONS -# ============================================================================= -def _validate_required_fields(data: dict, required_fields: list[str], context: str) -> None: - """ - Helper function to validate required fields are present in a dictionary. - Args: - data: Dictionary to validate - required_fields: List of required fields - context: Context for error message - Raises: - ValueError: If required fields are missing - """ - # Check if any required fields are missing from the data dictionary - missing_fields = [field for field in required_fields if field not in data] - if missing_fields: - raise ValueError(f"{context}: missing {missing_fields}") - - -def _load_config_and_return_validated() -> dict[str, Any]: - """ - Load all necessary configurations using config loader, validate, and return them. - # TODO: check config file return dict format correct (also in other functions throughout the codebase) - Returns: - Dict[str, Any]: Config dictionary with validated and converted values. - { - "bigquery_project_id": str, - "bigquery_location": str, - "rpc_providers": list[str], - "contract_address": str, - "contract_function": str, - "chain_id": int, - "scheduled_run_time": str, - "batch_size": int, - "max_age_before_deletion": int, - } - Raises: - ConfigurationError: If configuration loading fails - ValueError: If configuration validation fails - """ - try: - # Load configuration using config loader - loader = ConfigLoader() - config = loader.get_flat_config() - logger.info("Successfully loaded configuration") - # Validate and convert chain_id to integer - if config.get("chain_id"): - try: - config["chain_id"] = int(config["chain_id"]) - except ValueError as e: - raise ValueError(f"Invalid BLOCKCHAIN_CHAIN_ID: {config['chain_id']} - must be an integer.") from e - # Validate scheduled run time format (HH:MM) - if config.get("scheduled_run_time"): - try: - datetime.strptime(config["scheduled_run_time"], "%H:%M") - except ValueError as e: - raise ValueError( - f"Invalid SCHEDULED_RUN_TIME format: {config['scheduled_run_time']} - " - "must be in HH:MM format" - ) from e - # Validate blockchain configuration contains all required fields - required_fields = [ - "private_key", - "contract_address", - "contract_function", - "chain_id", - "scheduled_run_time", - ] - _validate_required_fields(config, required_fields, "Missing required blockchain configuration") - # Validate RPC providers - if not config.get("rpc_providers") or not isinstance(config["rpc_providers"], list): - raise ValueError("BLOCKCHAIN_RPC_URLS must be a list of valid RPC URLs") - return config - except ConfigurationError: - raise - except Exception as e: - raise ConfigurationError(f"Configuration validation failed: {e}") from e - - -def _get_path_to_project_root() -> Path: - """ - Get the path to the project root directory. - In Docker environments, use /app. Otherwise, find by marker files. - """ - # Use the /app directory as the project root if it exists - docker_path = Path("/app") - if docker_path.exists(): - return docker_path - # If the /app directory doesn't exist fall back to secondary detection logic - current_path = Path(__file__).parent - while current_path != current_path.parent: - if (current_path / ".gitignore").exists() or (current_path / "pyproject.toml").exists(): - logger.info(f"Found project root at: {current_path}") - return current_path - # Attempt to traverse upwards (will not work if the directory has no parent) - current_path = current_path.parent - # If we got here, something is wrong - raise FileNotFoundError("Could not find project root directory. Investigate.") - - -def _parse_and_validate_credentials_json(creds_env: str) -> dict: - """ - Parse and validate Google credentials JSON from environment variable. - Args: - creds_env: JSON string containing credentials - Returns: - dict: Parsed and validated credentials data - Raises: - ValueError: If JSON is invalid or credentials are incomplete - """ - # Try to parse the credentials JSON - try: - creds_data = json.loads(creds_env) - cred_type = creds_data.get("type", "") - # Validate the credentials data based on the type - if cred_type == "authorized_user": - required_fields = ["client_id", "client_secret", "refresh_token"] - _validate_required_fields(creds_data, required_fields, "Incomplete authorized_user credentials") - elif cred_type == "service_account": - required_fields = ["private_key", "client_email", "project_id"] - _validate_required_fields(creds_data, required_fields, "Incomplete service_account credentials") - else: - raise ValueError( - f"Unsupported credential type: '{cred_type}'. Expected 'authorized_user' or 'service_account'" - ) - # If the JSON is invalid, log an error and raise a ValueError - except Exception as e: - logger.error(f"Failed to parse and validate credentials JSON: {e}") - raise ValueError(f"Invalid credentials JSON: {e}") from e - # Return the parsed and validated credentials data - return creds_data - - -def _setup_user_credentials_in_memory(creds_data: dict) -> None: - """Set up user account credentials directly in memory.""" - import google.auth - from google.oauth2.credentials import Credentials - - try: - credentials = Credentials( - token=None, - refresh_token=creds_data.get("refresh_token"), - client_id=creds_data.get("client_id"), - client_secret=creds_data.get("client_secret"), - token_uri="https://oauth2.googleapis.com/token", - ) - # Set credentials globally for GCP libraries - google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined] - logger.info("Successfully loaded user account credentials from environment variable") - finally: - # Clear sensitive data from local scope - if "creds_data" in locals(): - creds_data.clear() - - -def _setup_service_account_credentials_in_memory(creds_data: dict) -> None: - """Set up service account credentials directly in memory.""" - import google.auth - from google.oauth2 import service_account - - try: - # Create credentials object directly from dict - credentials = service_account.Credentials.from_service_account_info(creds_data) - # Set credentials globally for GCP libraries - google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined] - logger.info("Successfully loaded service account credentials from environment variable") - except Exception as e: - logger.error(f"Failed to create service account credentials: {e}") - raise ValueError(f"Invalid service account credentials: {e}") from e - finally: - # Clear sensitive data from local scope - if "creds_data" in locals(): - creds_data.clear() - - -def _setup_google_credentials_in_memory_from_env_var(): - """ - Set up Google credentials directly in memory from environment variable. - This function handles multiple credential formats securely: - 1. JSON string in GOOGLE_APPLICATION_CREDENTIALS (inline credentials) - 2. Automatic fallback to gcloud CLI authentication - """ - # Get the account credentials from the environment variable - creds_env = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") - # If the credentials are not set, log a warning and return - if not creds_env: - logger.warning( - "GOOGLE_APPLICATION_CREDENTIALS not set. Falling back to gcloud CLI user credentials if available" - ) - return - # Case 1: JSON credentials provided inline - if creds_env.startswith("{"): - creds_data = None - try: - # Parse and validate credentials - creds_data = _parse_and_validate_credentials_json(creds_env) - cred_type = creds_data.get("type") - # Set up credentials based on type - if cred_type == "authorized_user": - _setup_user_credentials_in_memory(creds_data.copy()) - elif cred_type == "service_account": - _setup_service_account_credentials_in_memory(creds_data.copy()) - # If the credentials are invalid, log an error and raise a ValueError - except Exception as e: - logger.error("Failed to set up credentials from environment variable") - raise ValueError(f"Error processing inline credentials: {e}") from e - # Clear the original credentials dict from memory if it exists - finally: - if creds_data is not None: - creds_data.clear() - del creds_data - else: - logger.warning( - "GOOGLE_APPLICATION_CREDENTIALS is not set or not in the correct format. " - "Falling back to gcloud CLI authentication if available" - ) - return - - -# ============================================================================= -# DATA PROCESSING UTILITY FUNCTIONS -# ============================================================================= -def _export_bigquery_data_as_csvs_and_return_lists_of_ineligible_and_eligible_indexers( - input_data_from_bigquery: pd.DataFrame, output_date_dir: Path -) -> tuple[list, list]: - """ - Export BigQuery data as CSVs and return lists of eligible/ineligible indexers. - Args: - input_data_from_bigquery: Indexer data returned from BigQuery - output_date_dir: Path to date directory for output files - Returns: - Tuple[list, list]: Two lists of indexer addresses, eligible and ineligible - """ - # Ensure the output directory exists, creating parent directories if necessary - output_date_dir.mkdir(exist_ok=True, parents=True) - # Save raw data - raw_data_path = output_date_dir / "indexer_issuance_eligibility_data.csv" - input_data_from_bigquery.to_csv(raw_data_path, index=False) - logger.info(f"Saved raw bigquery results df to: {raw_data_path}") - # Filter eligible and ineligible indexers - eligible_df = input_data_from_bigquery[input_data_from_bigquery["eligible_for_indexing_rewards"] == 1] - ineligible_df = input_data_from_bigquery[input_data_from_bigquery["eligible_for_indexing_rewards"] == 0] - # Save filtered data - eligible_path = output_date_dir / "eligible_indexers.csv" - ineligible_path = output_date_dir / "ineligible_indexers.csv" - eligible_df[["indexer"]].to_csv(eligible_path, index=False) - ineligible_df[["indexer"]].to_csv(ineligible_path, index=False) - # Return lists of eligible and ineligible indexers - return eligible_df["indexer"].tolist(), ineligible_df["indexer"].tolist() - - -def _clean_old_date_directories(data_output_dir: Path, max_age_before_deletion: int): - """ - Remove old date directories to prevent unlimited growth. - Args: - data_output_dir: Path to the output directory - max_age_before_deletion: Maximum age in days before deleting data output - """ - today = date.today() - output_path = Path(data_output_dir) - # Only process directories with date format YYYY-MM-DD - for item in output_path.iterdir(): - if not item.is_dir(): - continue - try: - # Try to parse the directory name as a date - dir_date = datetime.strptime(item.name, "%Y-%m-%d").date() - age_days = (today - dir_date).days - # Remove if older than max_age_before_deletion - if age_days > max_age_before_deletion: - logger.info(f"Removing old data directory: {item} ({age_days} days old)") - shutil.rmtree(item) - # Skip directories that don't match date format - except ValueError: - continue - - -# ============================================================================= -# BLOCKCHAIN UTILITY FUNCTIONS (LOW-LEVEL) -# ============================================================================= -def _load_contract_abi() -> list[dict]: - """Load the contract ABI from the contracts directory.""" - try: - project_root = _get_path_to_project_root() - abi_path = project_root / "contracts" / "contract.abi.json" - with open(abi_path) as f: - return json.load(f) - # If the ABI file cannot be loaded, raise an error - except Exception as e: - logger.error(f"Failed to load contract ABI: {str(e)}") - raise - - -def _get_working_web3_connection( - rpc_providers: list[str], contract_address: str, contract_abi: list[dict] -) -> tuple[Web3, Contract, str]: - """ - Try connecting to RPC providers until one works. - Args: - rpc_providers: List of RPC provider URLs to try connecting to - contract_address: Contract address for creating contract instance - contract_abi: Contract ABI for creating contract instance - Returns: - Tuple[Web3, Contract, str]: Working web3 instance, contract instance, and provider URL - Raises: - ConnectionError: If all RPC providers fail - """ - for i, rpc_url in enumerate(rpc_providers): - try: - provider_type = "primary" if i == 0 else f"backup #{i}" - logger.info(f"Attempting to connect to {provider_type} RPC provider: {rpc_url}") - w3 = Web3(Web3.HTTPProvider(rpc_url)) - # Test connection - if w3.is_connected(): - logger.info(f"Successfully connected to {provider_type} RPC provider") - # Create contract instance and return web3 instance, contract instance, and provider URL - contract = w3.eth.contract(address=Web3.to_checksum_address(contract_address), abi=contract_abi) - return w3, contract, rpc_url - else: - logger.warning(f"Could not connect to {provider_type} RPC provider: {rpc_url}") - except Exception as e: - provider_type = "primary" if i == 0 else f"backup #{i}" - logger.warning(f"Error connecting to {provider_type} RPC provider {rpc_url}: {str(e)}") - # If we get here, all providers failed - raise ConnectionError(f"Failed to connect to any of {len(rpc_providers)} RPC providers: {rpc_providers}") - - -def _setup_transaction_account(private_key: str, w3) -> tuple[str, object]: - """ - Get the address of the account from the private key. - Args: - private_key: Private key for the account - w3: Web3 instance - Returns: - str: Address of the account - """ - try: - account = w3.eth.account.from_key(private_key) - logger.info(f"Using account: {account.address}") - return account.address - # If the account cannot be retrieved, log the error and raise an exception - except Exception as e: - logger.error(f"Failed to retrieve account from private key: {str(e)}") - raise - - -def _estimate_transaction_gas( - w3, contract_func, indexer_addresses: list[str], data_bytes: bytes, sender_address: str -) -> int: - """ - Estimate gas for the transaction with 25% buffer. - Args: - w3: Web3 instance - contract_func: Contract function to call - indexer_addresses: List of indexer addresses - data_bytes: Data bytes for the transaction - sender_address: Transaction sender address - Returns: - int: Estimated gas with 25% buffer - """ - # Try to estimate the gas for the transaction - try: - estimated_gas = contract_func(indexer_addresses, data_bytes).estimate_gas({"from": sender_address}) - gas_limit = int(estimated_gas * 1.25) # 25% buffer - logger.info(f"Estimated gas: {estimated_gas}, with buffer: {gas_limit}") - return gas_limit - # If the gas estimation fails, raise an error - except Exception as e: - logger.error(f"Gas estimation failed: {str(e)}") - raise - - -def _determine_transaction_nonce(w3, sender_address: str, replace: bool) -> int: - """ - Determine the appropriate nonce for the transaction. - Args: - w3: Web3 instance - sender_address: Transaction sender address - replace: Whether to replace pending transactions - Returns: - int: Transaction nonce to use - """ - # If we are not replacing a pending transaction, use the next available nonce - if not replace: - nonce = w3.eth.get_transaction_count(sender_address) - logger.info(f"Using next available nonce: {nonce}") - return nonce - # If we are replacing a pending transaction, try to find and replace it - logger.info("Attempting to find and replace a pending transaction") - # Try to find pending transactions - try: - pending_txs = w3.eth.get_block("pending", full_transactions=True) - sender_pending_txs = [ - tx for tx in pending_txs.transactions if hasattr(tx, "from") and tx["from"] == sender_address - ] - # If we found pending transactions, use the nonce of the first pending transaction - if sender_pending_txs: - sender_pending_txs.sort(key=lambda x: x["nonce"]) - nonce = sender_pending_txs[0]["nonce"] - logger.info(f"Found pending transaction with nonce {nonce} for replacement") - return nonce - # If we could not find pending transactions log a warning - except Exception as e: - logger.warning(f"Could not check pending transactions: {str(e)}") - # Check for nonce gaps - try: - current_nonce = w3.eth.get_transaction_count(sender_address, "pending") - latest_nonce = w3.eth.get_transaction_count(sender_address, "latest") - if current_nonce > latest_nonce: - logger.info(f"Detected nonce gap: latest={latest_nonce}, pending={current_nonce}") - return latest_nonce - except Exception as e: - logger.warning(f"Could not check nonce gap: {str(e)}") - # Fallback to next available nonce - nonce = w3.eth.get_transaction_count(sender_address) - logger.info(f"Using next available nonce: {nonce}") - return nonce - - -def _get_gas_prices(w3, replace: bool) -> tuple[int, int]: - """Get base fee and max priority fee for transaction.""" - # Get current gas prices with detailed logging - try: - latest_block = w3.eth.get_block("latest") - base_fee = latest_block["baseFeePerGas"] - logger.info(f"Latest block base fee: {base_fee/1e9:.2f} gwei") - # If the base fee cannot be retrieved, use a fallback value - except Exception as e: - logger.warning(f"Could not get base fee: {e}") - base_fee = w3.to_wei(10, "gwei") - try: - max_priority_fee = w3.eth.max_priority_fee - logger.info(f"Max priority fee: {max_priority_fee/1e9:.2f} gwei") - except Exception as e: - logger.warning(f"Could not get max priority fee: {e}") - max_priority_fee = w3.to_wei(2, "gwei") # fallback - return base_fee, max_priority_fee - - -def _build_transaction_params( - sender_address: str, - nonce: int, - chain_id: int, - gas_limit: int, - base_fee: int, - max_priority_fee: int, - replace: bool, -) -> dict: - """Build transaction parameters with appropriate gas prices.""" - tx_params = {"from": sender_address, "nonce": nonce, "chainId": chain_id, "gas": gas_limit} - # Set gas prices (higher for replacement transactions) - if replace: - max_fee_per_gas = base_fee * 4 + max_priority_fee * 2 - max_priority_fee_per_gas = max_priority_fee * 2 - tx_params["maxFeePerGas"] = max_fee_per_gas - tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas - logger.info(f"High gas for replacement: {max_fee_per_gas/1e9:.2f} gwei") - # If we are not replacing a pending transaction, use a lower gas price - else: - max_fee_per_gas = base_fee * 2 + max_priority_fee - max_priority_fee_per_gas = max_priority_fee - tx_params["maxFeePerGas"] = max_fee_per_gas - tx_params["maxPriorityFeePerGas"] = max_priority_fee_per_gas - logger.info(f"Standard gas: {max_fee_per_gas/1e9:.2f} gwei") - logger.info(f"Transaction parameters: nonce={nonce}, gas={gas_limit}, chain_id={chain_id}") - return tx_params - - -def _build_and_sign_transaction( - w3, contract_func, indexer_addresses: list[str], data_bytes: bytes, tx_params: dict, private_key: str -): - """Build and sign the transaction.""" - # Attempt to build the transaction - try: - transaction = contract_func(indexer_addresses, data_bytes).build_transaction(tx_params) - logger.info("Transaction built successfully") - # If the transaction cannot be built, log the error and raise an exception - except Exception as e: - logger.error(f"Failed to build transaction: {e}") - logger.error(f"Contract function: {contract_func}") - logger.error(f"Indexer addresses count: {len(indexer_addresses)}") - logger.error(f"Data bytes length: {len(data_bytes)}") - logger.error(f"Transaction params: {tx_params}") - raise - # Attempt to sign the transaction - try: - signed_tx = w3.eth.account.sign_transaction(transaction, private_key) - logger.info("Transaction signed successfully") - return signed_tx - # If the transaction cannot be signed, log the error and raise an exception - except Exception as e: - logger.error(f"Failed to sign transaction: {e}") - raise - - -def _handle_transaction_error(error_msg: str) -> None: - """Handle and log specific transaction error types.""" - if "insufficient funds" in error_msg.lower(): - logger.error("Insufficient funds to pay for gas") - elif "nonce too low" in error_msg.lower(): - logger.error("Nonce is too low - transaction may have already been sent") - elif "nonce too high" in error_msg.lower(): - logger.error("Nonce is too high - there may be pending transactions") - elif "gas" in error_msg.lower(): - logger.error("Gas-related issue - transaction may consume too much gas") - elif "400" in error_msg: - logger.error("HTTP 400 Bad Request - RPC provider rejected the request") - - -def _send_signed_transaction(w3, signed_tx) -> str: - """Send the signed transaction and handle errors.""" - # Attempt to send the transaction to the network - try: - tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) - logger.info(f"Transaction sent! Hash: {tx_hash.hex()}") - return tx_hash.hex() - # If the transaction could not be sent, log the error and raise an exception - except ValueError as e: - error_msg = str(e) - logger.error(f"Transaction rejected by network: {error_msg}") - _handle_transaction_error(error_msg) - raise - except Exception as e: - logger.error(f"Unexpected error sending transaction: {e}") - logger.error(f"Error type: {type(e).__name__}") - raise - - -def _build_and_send_transaction( - w3, - contract_func, - indexer_addresses: list[str], - data_bytes: bytes, - sender_address: str, - private_key: str, - chain_id: int, - gas_limit: int, - nonce: int, - replace: bool, -) -> str: - """ - Build, sign, and send the transaction. - Args: - w3: Web3 instance - contract_func: Contract function to call - indexer_addresses: List of indexer addresses - data_bytes: Data bytes for transaction - sender_address: Transaction sender address - private_key: Private key for signing - chain_id: Chain ID - gas_limit: Gas limit for transaction - nonce: Transaction nonce - replace: Whether this is a replacement transaction - Returns: - str: Transaction hash - """ - try: - # Get gas prices - base_fee, max_priority_fee = _get_gas_prices(w3, replace) - # Build transaction parameters - tx_params = _build_transaction_params( - sender_address, nonce, chain_id, gas_limit, base_fee, max_priority_fee, replace - ) - # Build and sign transaction - signed_tx = _build_and_sign_transaction( - w3, contract_func, indexer_addresses, data_bytes, tx_params, private_key - ) - # Send transaction - return _send_signed_transaction(w3, signed_tx) - except Exception as e: - logger.error(f"Error in _build_and_send_transaction: {e}") - raise - - -# ============================================================================= -# BLOCKCHAIN TRANSACTION FUNCTIONS (MID-LEVEL) -# ============================================================================= -def _execute_transaction_with_rpc_failover( - operation_name: str, rpc_providers: list[str], contract_address: str, operation_func, operation_params: dict -): - """ - Execute a transaction operation with automatic RPC failover. - This function tries each RPC provider in sequence until one succeeds. - If an RPC fails during any part of the transaction process, it moves to the next one. - Args: - operation_name: Human-readable name for the transaction operation, used for logging purposes - rpc_providers: List of RPC provider URLs to try connecting to - contract_address: Contract address - operation_func: Function that takes (w3, contract, operation_params) and does 'operation_name' operation - default 'operation_func' is _execute_complete_transaction() - operation_params: Parameters for the operation, e.g. - { - "private_key": private_key, - "contract_function": contract_function, - "indexer_addresses": indexer_addresses, - "data_bytes": data_bytes, - "sender_address": sender_address, - "account": account, - "chain_id": chain_id, - "replace": replace - } - Returns: - Result of the operation_func - Raises: - Exception: If all RPC providers fail - """ - # Initialize last_exception to None - last_exception = None - for rpc_url in rpc_providers: - try: - # Log the attempt - logger.info(f"Attempting to do '{operation_name}' using RPC provider: {rpc_url}") - # Get fresh connection for this rpc provider attempt - w3, contract, _ = _get_working_web3_connection([rpc_url], contract_address, _load_contract_abi()) - # Execute the operation with this rpc provider and return the result - return operation_func(w3, contract, operation_params) - # If the operation fails, log the error and continue to the next rpc provider - except Exception as e: - logger.warning(f"{operation_name} failed with RPC provider {rpc_url}: {str(e)}") - # Store the exception for later use - last_exception = e - # If we get here, all providers failed - logger.error(f"{operation_name} failed on all {len(rpc_providers)} RPC providers") - raise last_exception or Exception(f"All RPC providers failed for {operation_name}") - - -def _execute_complete_transaction(w3, contract, params): - """ - Execute the complete transaction process using a single RPC connection. - Args: - w3: Web3 instance - contract: Contract instance - params: Dictionary containing all transaction parameters - Returns: - str: Transaction hash - """ - # Extract parameters - private_key = params["private_key"] - contract_function = params["contract_function"] - indexer_addresses = params["indexer_addresses"] - data_bytes = params["data_bytes"] - sender_address = params["sender_address"] - chain_id = params["chain_id"] - replace = params["replace"] - # Validate contract function exists - if not hasattr(contract.functions, contract_function): - raise ValueError(f"Contract {contract.address} does not have function: {contract_function}") - contract_func = getattr(contract.functions, contract_function) - # Log transaction details - logger.info(f"Contract address: {contract.address}") - logger.info(f"Contract function: {contract_function}") - logger.info(f"Number of indexers: {len(indexer_addresses)}") - logger.info(f"Data bytes length: {len(data_bytes)}") - logger.info(f"Chain ID: {chain_id}") - logger.info(f"Sender address: {sender_address}") - logger.info(f"Using RPC: {w3.provider.endpoint_uri}") - # Check account balance - balance_wei = w3.eth.get_balance(sender_address) - balance_eth = w3.from_wei(balance_wei, "ether") - logger.info(f"Account balance: {balance_eth} ETH") - # All transaction steps with the same RPC connection - gas_limit = _estimate_transaction_gas(w3, contract_func, indexer_addresses, data_bytes, sender_address) - nonce = _determine_transaction_nonce(w3, sender_address, replace) - tx_hash = _build_and_send_transaction( - w3, - contract_func, - indexer_addresses, - data_bytes, - sender_address, - private_key, - chain_id, - gas_limit, - nonce, - replace, - ) - # Wait for receipt with the same connection - try: - tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=30) - if tx_receipt["status"] == 1: - logger.info( - f"Transaction confirmed in block {tx_receipt['blockNumber']}, gas used: {tx_receipt['gasUsed']}" - ) - else: - logger.error(f"Transaction failed on-chain: {tx_hash}") - except Exception as e: - logger.warning(f"Could not get transaction receipt: {str(e)} (transaction may still be pending)") - return tx_hash - - -def _send_transaction_to_allow_indexers_in_list_to_claim_issuance( - list_of_indexers_that_can_claim_issuance: list[str], - private_key: str, - chain_id: int, - rpc_providers: list[str], - contract_address: str, - contract_function: str, - replace: bool = False, - data_bytes: bytes = b"", -) -> str: - """ - Send a transaction to the indexer eligibility oracle contract to allow a subset of indexers - to claim issuance rewards. - This function builds, signs, and sends a transaction to the blockchain using RPC failover. - This function is called by the batch_allow_indexers_issuance_eligibility_smart_contract function, which handles - batching of transactions if the list before input into this function. - Args: - list_of_indexers_that_can_claim_issuance: List of indexer addresses to allow issuance - private_key: Private key for transaction signing - chain_id: Chain ID of the target blockchain - rpc_providers: List of RPC provider URLs (primary + backups) - contract_address: Contract address - contract_function: Contract function name to call - replace: Flag to replace pending transactions - data_bytes: Optional bytes data to pass to contract function - Returns: - str: Transaction hash - """ - # Set up account - from web3 import Web3 - - temp_w3 = Web3() - sender_address = _setup_transaction_account(private_key, temp_w3) - # Convert addresses to checksum format - checksum_addresses = [Web3.to_checksum_address(addr) for addr in list_of_indexers_that_can_claim_issuance] - # Prepare all parameters for the transaction - transaction_params = { - "private_key": private_key, - "contract_function": contract_function, - "indexer_addresses": checksum_addresses, - "data_bytes": data_bytes, - "sender_address": sender_address, - "chain_id": chain_id, - "replace": replace, - } - # Execute the transaction to allow indexers to claim issuance with RPC failover - try: - return _execute_transaction_with_rpc_failover( - "Allow indexers to claim issuance", - rpc_providers, - contract_address, - _execute_complete_transaction, - transaction_params, - ) - except Exception as e: - logger.error(f"Transaction failed on all RPC providers: {str(e)}") - raise - - -# ============================================================================= -# HIGH-LEVEL BATCH TRANSACTION FUNCTION -# ============================================================================= -def batch_allow_indexers_issuance_eligibility_smart_contract( - list_of_indexers_to_allow: list[str], replace: bool = False, batch_size: int = 250, data_bytes: bytes = b"" -) -> list[str]: - """ - Allow the issuance eligibility status of a list of indexers in the smart contract. - This function handles batching of transactions if the list is too large for a single - transaction, and uses key validation for private keys. - Args: - list_of_indexers_to_allow: List of indexer addresses to allow - replace: Optional flag to replace pending transactions - batch_size: Optional batch size for processing large lists - data_bytes: Optional bytes data to pass to contract_address:contract_function - Returns: - List[str]: List of transaction hashes from successful batches - Raises: - ConfigurationError: If configuration loading fails - ValueError: If configuration is invalid - ConnectionError: If unable to connect to any RPC providers - Exception: If transaction processing fails - """ - # Get config - config = _load_config_and_return_validated() - # Validate function parameters look correct - if not list_of_indexers_to_allow: - logger.warning("No indexers provided to allow. Returning empty list.") - return [] - if batch_size <= 0: - raise ValueError("batch_size must be positive") - # Calculate number of batches to process - total_indexers_to_allow = len(list_of_indexers_to_allow) - num_batches = (total_indexers_to_allow + batch_size - 1) // batch_size - logger.info(f"Processing {total_indexers_to_allow} indexers in {num_batches} batch(es) of {batch_size}") - try: - tx_links = [] - # Validate and format private key - private_key = validate_and_format_private_key(str(config["private_key"])) - # Process each batch - for i in range(num_batches): - start_idx = i * batch_size - end_idx = min(start_idx + batch_size, total_indexers_to_allow) - batch_indexers = list_of_indexers_to_allow[start_idx:end_idx] - logger.info(f"Processing batch {i+1}/{num_batches} with {len(batch_indexers)} indexers") - try: - tx_hash = _send_transaction_to_allow_indexers_in_list_to_claim_issuance( - batch_indexers, - private_key, - int(config["chain_id"]), - list(config["rpc_providers"]), - str(config["contract_address"]), - str(config["contract_function"]), - replace, - data_bytes, - ) - tx_links.append(f"https://sepolia.arbiscan.io/tx/{tx_hash}") - logger.info(f"Batch {i+1} transaction successful: {tx_hash}") - except Exception as e: - logger.error(f"Error processing batch {i+1} due to: {e}") - # Print all the transaction links - for i, tx_link in enumerate(tx_links, 1): - logger.info(f"Transaction link {i} of {len(tx_links)}: {tx_link}") - return tx_links - except KeyValidationError as e: - logger.error(f"Private key validation failed: {e}") - raise ValueError(f"Invalid private key: {e}") from e - - -# ============================================================================= -# MAIN BIGQUERY DATA PROCESSING FUNCTION -# ============================================================================= -@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=30, max=120), reraise=True) -def bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers( - start_date: date, - end_date: date, - current_date: date, - max_age_before_deletion: int, -) -> list[str]: - """ - Main function to fetch and process data from BigQuery. - Returns: - List[str]: List of indexers that should be allowed issuance based on BigQuery data - """ - # Load config using secure configuration loader - config = _load_config_and_return_validated() - # Initialize the BigQuery provider class so we can use its methods to fetch data from BigQuery - bq_provider = BigQueryProvider( - project=str(config["bigquery_project_id"]), location=str(config["bigquery_location"]) - ) - try: - # Fetch eligibility dataframe - logger.info(f"Fetching eligibility data between {start_date} and {end_date}") - indexer_issuance_eligibility_data = bq_provider.fetch_indexer_issuance_eligibility_data( - start_date, end_date - ) - logger.info(f"Retrieved issuance eligibility data for {len(indexer_issuance_eligibility_data)} indexers") - # Store the output directory paths as variables so we can pass them to other functions - output_dir = _get_path_to_project_root() / "data" / "output" - date_dir = output_dir / current_date.strftime("%Y-%m-%d") - # Export separate lists for eligible and ineligible indexers - logger.info(f"Attempting to export indexer issuance eligibility lists to: {date_dir}") - eligible_indexers, ineligible_indexers = ( - _export_bigquery_data_as_csvs_and_return_lists_of_ineligible_and_eligible_indexers( - indexer_issuance_eligibility_data, date_dir - ) - ) - logger.info("Exported indexer issuance eligibility lists.") - # Clean old eligibility lists - logger.info("Cleaning old eligibility lists.") - _clean_old_date_directories(output_dir, max_age_before_deletion) - # Log final summary - logger.info(f"Processing complete. Output available at: {date_dir}") - # Log the number of eligible indexers - logger.info( - f"No. of elig. indxrs. to insert into smart contract on {date.today()} is: {len(eligible_indexers)}" - ) - # Return list of indexers that should be allowed issuance - return eligible_indexers - except Exception as e: - logger.error(f"Error processing data: {str(e)}", exc_info=True) - raise - - -# ============================================================================= -# FUTURE FUNCTIONS (NOT USED YET) -# ============================================================================= -def _fetch_issuance_enabled_indexers_from_subgraph() -> list[str]: - """ - TODO: fix this once we have the subgraph - Queries the indexer eligibility subgraph to get the list of indexers that are - currently allowed issuance. - Returns: - List[str]: A list of indexer addresses that are currently allowed issuance - """ - # Load config and check that the necessary variables are set - config = _load_config_and_return_validated() - subgraph_url = config.get("subgraph_url") - studio_api_key = config.get("studio_api_key") - if not subgraph_url: - raise ValueError("SUBGRAPH_URL_PRODUCTION not set in configuration") - if not studio_api_key: - raise ValueError("STUDIO_API_KEY not set in configuration") - logger.info("Configuration for subgraph query loaded successfully.") - try: - # Initialize the subgraph provider class so we can use its methods to fetch data from our subgraph - subgraph_provider = SubgraphProvider() - # Fetch all indexers from the subgraph - indexers_data = subgraph_provider.fetch_all_indexers() - logger.info(f"Retrieved data for {len(indexers_data)} indexers from subgraph") - # Extract currently denied indexers (those where isDenied is True) - allowed_indexers = [] - for indexer in indexers_data: - if indexer.get("isDenied", False): - allowed_indexers.append(indexer["id"].lower()) - logger.info(f"Found {len(allowed_indexers)} indexers that are currently allowed issuance") - return allowed_indexers - except Exception as e: - logger.error(f"Error fetching allowed indexers from subgraph: {str(e)}", exc_info=True) - raise diff --git a/src/models/issuance_eligibility_oracle_core.py b/src/models/issuance_eligibility_oracle_core.py deleted file mode 100644 index 6c4b162..0000000 --- a/src/models/issuance_eligibility_oracle_core.py +++ /dev/null @@ -1,144 +0,0 @@ -""" -Service Quality Oracle's core module for fetching & processing data. -This module serves as the entry point for the oracle functionality, responsible for: -1. Fetching eligibility data from BigQuery -2. Processing indexer data to determine eligibility -3. Submitting eligible indexers to the blockchain contract -4. Sending Slack notifications about run status -For blockchain interactions and data processing utilities, see issuance_data_access_helper.py. -""" - -import logging -import os -import sys -import time -from datetime import date, timedelta - -# Add project root to path -project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) -sys.path.insert(0, project_root) -# Import data access utilities with absolute import -from src.models.issuance_data_access_helper import ( - _setup_google_credentials_in_memory_from_env_var, - batch_allow_indexers_issuance_eligibility_smart_contract, - bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers, -) -from src.utils.config_loader import load_config -from src.utils.slack_notifier import create_slack_notifier - -# Set up basic logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - - -def main(): - """ - Main entry point for the Service Quality Oracle. - This function: - 1. Sets up Google credentials (if not already set up by scheduler) - 2. Fetches and processes indexer eligibility data - 3. Submits eligible indexers to the blockchain - 4. Sends Slack notifications about the run status - """ - start_time = time.time() - slack_notifier = None - - try: - # Load configuration to get Slack webhook and other settings - config = load_config() - slack_notifier = create_slack_notifier(config.get("SLACK_WEBHOOK_URL")) - if slack_notifier: - logger.info("Slack notifications enabled") - else: - logger.info("Slack notifications disabled (no webhook URL configured)") - - except Exception as e: - logger.error(f"Failed to load configuration: {str(e)}") - sys.exit(1) - - try: - # Attempt to load google bigquery data access credentials - try: - import google.auth - - _ = google.auth.default() - # If credentials could not be loaded, set them up in memory via helper function using environment variables - except Exception: - _setup_google_credentials_in_memory_from_env_var() - - try: - # Fetch + save indexer eligibility data and return eligible list as 'eligible_indexers' array - eligible_indexers = ( - bigquery_fetch_and_save_indexer_issuance_eligibility_data_finally_return_eligible_indexers( - start_date=date.today() - timedelta(days=28), - end_date=date.today(), - current_date=date.today(), - max_age_before_deletion=config.get("MAX_AGE_BEFORE_DELETION"), - ) - ) - - logger.info(f"Found {len(eligible_indexers)} eligible indexers.") - - # Send eligible indexers to the blockchain contract - try: - transaction_links = batch_allow_indexers_issuance_eligibility_smart_contract( - eligible_indexers, replace=True, batch_size=config.get("BATCH_SIZE"), data_bytes=b"" - ) - - # Calculate execution time and send success notification - execution_time = time.time() - start_time - logger.info(f"Oracle run completed successfully in {execution_time:.2f} seconds") - - if slack_notifier: - # Calculate batch information for notification - config.get("BATCH_SIZE", 125) - batch_count = len(transaction_links) if transaction_links else 0 - total_processed = len(eligible_indexers) - - slack_notifier.send_success_notification( - eligible_indexers=eligible_indexers, - total_processed=total_processed, - execution_time=execution_time, - transaction_links=transaction_links, - batch_count=batch_count, - ) - - except Exception as e: - execution_time = time.time() - start_time - error_msg = f"Failed to allow indexers to claim issuance because: {str(e)}" - logger.error(error_msg) - - if slack_notifier: - slack_notifier.send_failure_notification( - error_message=str(e), stage="Blockchain Submission", execution_time=execution_time - ) - - sys.exit(1) - - except Exception as e: - execution_time = time.time() - start_time - error_msg = f"Failed to process indexer issuance eligibility data because: {str(e)}" - logger.error(error_msg) - - if slack_notifier: - slack_notifier.send_failure_notification( - error_message=str(e), stage="Data Processing", execution_time=execution_time - ) - - sys.exit(1) - - except Exception as e: - execution_time = time.time() - start_time - error_msg = f"Oracle initialization or authentication failed: {str(e)}" - logger.error(error_msg) - - if slack_notifier: - slack_notifier.send_failure_notification( - error_message=str(e), stage="Initialization", execution_time=execution_time - ) - - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/src/models/scheduler.py b/src/models/scheduler.py index e0d5adc..55440aa 100644 --- a/src/models/scheduler.py +++ b/src/models/scheduler.py @@ -8,10 +8,8 @@ import schedule from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential -from src.models.issuance_data_access_helper import ( - _setup_google_credentials_in_memory_from_env_var, -) -from src.utils.config_loader import load_config +import src.models.service_quality_oracle as oracle +from src.utils.configuration import credential_manager, load_config, validate_all_required_env_vars from src.utils.slack_notifier import create_slack_notifier # Configure logging @@ -21,258 +19,209 @@ handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger("oracle-scheduler") + # Path to store last run info LAST_RUN_FILE = "/app/data/last_run.txt" HEALTHCHECK_FILE = "/app/healthcheck" -# Create a global slack notifier instance -slack_notifier = None +class Scheduler: -def get_last_run_date(): - """Get the date of the last successful run from a persistent file""" - if os.path.exists(LAST_RUN_FILE): - try: - with open(LAST_RUN_FILE) as f: - last_run_str = f.read().strip() - return datetime.strptime(last_run_str, "%Y-%m-%d").date() - except Exception as e: - logger.error(f"Error reading last run date: {e}") - return None - - -def save_last_run_date(run_date): - """Save the date of the last successful run to a file that we continuously overwrite each time""" - try: - os.makedirs(os.path.dirname(LAST_RUN_FILE), exist_ok=True) - with open(LAST_RUN_FILE, "w") as f: - f.write(run_date.strftime("%Y-%m-%d")) - except Exception as e: - logger.error(f"Error saving last run date: {e}") - - -def update_healthcheck(message=None): - """Update the healthcheck file with current timestamp and optional message""" - try: - with open(HEALTHCHECK_FILE, "w") as f: - timestamp = datetime.now().isoformat() - f.write(f"Last update: {timestamp}") - if message: - f.write(f"\n{message}") - except Exception as e: - logger.warning(f"Failed to update healthcheck file: {e}") - - -@retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=60, max=600), - retry=retry_if_exception_type(Exception), - before_sleep=lambda retry_state: logger.warning( - f"Retry attempt {retry_state.attempt_number} after error: {retry_state.outcome.exception()}" - ), -) -def run_oracle(force_date=None): - """ - Function to run the Service Quality Oracle - - Args: - force_date: If provided, override the date for this run - """ - global slack_notifier - today = force_date or datetime.now().date() - start_time = datetime.now() - logger.info(f"Starting Service Quality Oracle run at {start_time} for date {today}") - # Ensure we have valid google credentials before proceeding - _setup_google_credentials_in_memory_from_env_var() - - # Attempt to run the oracle - try: - # Load latest configuration using config loader - load_config() - - # Run the oracle - import src.models.issuance_eligibility_oracle_core as oracle - - oracle.main() - - # Record successful run and overwrite the last run date - save_last_run_date(today) - end_time = datetime.now() - duration_in_seconds = (end_time - start_time).total_seconds() - success_message = f"Run completed successfully for {today}. Duration: {duration_in_seconds:.2f}s" - logger.info(f"Service Quality Oracle {success_message}") + def __init__(self): + self.slack_notifier = None + self.config = self.initialize() - # Touch healthcheck file to indicate successful runs - update_healthcheck(success_message) - # Send success notification from scheduler - if slack_notifier: - slack_notifier.send_success_notification( - message=f"Run completed successfully for {today}. Duration: {duration_in_seconds:.2f}s", - title="Scheduled Run Success", - ) + def get_last_run_date(self): + """ + Get the date of the last successful run from a persistent file. + If the last run is older than 7 days, cap it at 7 days ago to limit BigQuery costs. + """ + last_run_date = None + if os.path.exists(LAST_RUN_FILE): + try: + with open(LAST_RUN_FILE) as f: + last_run_str = f.read().strip() + last_run_date = datetime.strptime(last_run_str, "%Y-%m-%d").date() + except Exception as e: + logger.error(f"Error reading or parsing last run date file: {e}") + return None - # Return True to indicate success - return True + today = datetime.now().date() + seven_days_ago = today - timedelta(days=7) - # If there is an error when trying to run the oracle, log the error and raise an exception - except Exception as e: - error_message = f"Run failed due to: {str(e)}" - logger.error(error_message, exc_info=True) + if last_run_date and last_run_date < seven_days_ago: + logger.warning( + f"Last successful run was on {last_run_date}, which is more than 7 days ago. " + f"Capping backfill to 7 days to conserve BigQuery credits." + ) + return seven_days_ago - # Update healthcheck file to indicate failure - update_healthcheck(f"ERROR: {error_message}") + return last_run_date - # Send failure notification to slack - if slack_notifier: - duration = (datetime.now() - start_time).total_seconds() - slack_notifier.send_failure_notification( - error_message=str(e), - stage="Scheduled Run" if force_date is None else f"Missed Run ({force_date})", - execution_time=duration, - ) - # Raise an exception to indicate failure - raise - - -def check_missed_runs(): - """Check if we missed any runs and execute them if needed""" - global slack_notifier - today = datetime.now().date() - last_run = get_last_run_date() - if last_run is None: - logger.info("No record of previous runs. Will run at next scheduled time.") - return False - if last_run < today - timedelta(days=1): - # We missed at least one day - missed_days = (today - last_run).days - 1 - logger.warning(f"Detected {missed_days} missed runs. Last run was on {last_run}.") - - # Send notification about missed runs - if slack_notifier: - message = ( - f"Detected {missed_days} missed oracle runs. " - f"Last successful run was on {last_run}. " - "Attempting to execute missed run for yesterday." - ) - slack_notifier.send_info_notification( - message=message, - title="Missed Runs Detected", - ) + def save_last_run_date(self, run_date): + """Save the date of the last successful run to a file that we continuously overwrite each time""" + try: + os.makedirs(os.path.dirname(LAST_RUN_FILE), exist_ok=True) + with open(LAST_RUN_FILE, "w") as f: + f.write(run_date.strftime("%Y-%m-%d")) + except Exception as e: + logger.error(f"Error saving last run date: {e}") + - # Run for the missed day (just run for yesterday, not all missed days) - yesterday = today - timedelta(days=1) - logger.info(f"Executing missed run for {yesterday}") + def update_healthcheck(self, message=None): + """Update the healthcheck file with current timestamp and optional message""" try: - run_oracle(force_date=yesterday) - return True + with open(HEALTHCHECK_FILE, "w") as f: + timestamp = datetime.now().isoformat() + f.write(f"Last update: {timestamp}") + if message: + f.write(f"\n{message}") except Exception as e: - logger.error(f"Failed to execute missed run for {yesterday}: {e}") - return False - return False - - -def initialize(): - """Initialize the scheduler and validate configuration""" - global slack_notifier - logger.info("Initializing scheduler...") - try: - # Early validation of required environment variables - from src.utils.config_loader import validate_all_required_env_vars - - logger.info("Validating required environment variables...") - validate_all_required_env_vars() - - # Validate credentials early to fail fast if there are issues - _setup_google_credentials_in_memory_from_env_var() - - # Load and validate configuration - config = load_config() - - # Initialize Slack notifications - slack_notifier = create_slack_notifier(config.get("slack_webhook_url")) - if slack_notifier: - logger.info("Slack notifications enabled for scheduler") - - # Send startup notification - startup_message = ( - f"Service Quality Oracle scheduler started successfully.\n" - f"**Scheduled time:** {config['scheduled_run_time']} UTC\n" - f"**Environment:** {os.environ.get('ENVIRONMENT', 'unknown')}" - ) - slack_notifier.send_info_notification( - message=startup_message, - title="Scheduler Started", - ) - else: - logger.info("Slack notifications disabled for scheduler") - - # Set timezone for consistent scheduling - timezone = pytz.timezone("UTC") - logger.info(f"Using timezone: {timezone}") - # Schedule the job - run_time = config["scheduled_run_time"] - logger.info(f"Scheduling daily run at {run_time} UTC") - schedule.every().day.at(run_time).do(run_oracle) - # Create initial healthcheck file - update_healthcheck("Scheduler initialized") - # Run on startup if requested - if os.environ.get("RUN_ON_STARTUP", "false").lower() == "true": - logger.info("RUN_ON_STARTUP=true, executing oracle immediately") - run_oracle() - else: - # Check for missed runs - logger.info("Checking for missed runs...") - if check_missed_runs(): - logger.info("Executed missed run successfully") + logger.warning(f"Failed to update healthcheck file: {e}") + + + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=60, max=600), + retry=retry_if_exception_type(Exception), + before_sleep=lambda retry_state: logger.warning( + f"Retry attempt {retry_state.attempt_number} after error: {retry_state.outcome.exception()}" + ), + ) + def run_oracle(self, run_date_override=None): + """ + Function to run the Service Quality Oracle + + Args: + run_date_override: If provided, override the date for this run + """ + run_date = run_date_override or datetime.now().date() + start_time = datetime.now() + logger.info(f"Starting Service Quality Oracle run at {start_time} for date {run_date}") + + # The oracle.main() function handles its own exceptions, notifications, and credential setup. + # The scheduler's role is simply to trigger it and handle the retry logic. + oracle.main(run_date_override=run_date) + + # If oracle.main() completes without sys.exit, it was successful. + # Record successful run and update healthcheck. + self.save_last_run_date(run_date) + end_time = datetime.now() + duration_in_seconds = (end_time - start_time).total_seconds() + success_message = ( + f"Scheduler successfully triggered oracle run for {run_date}. Duration: {duration_in_seconds:.2f}s" + ) + logger.info(success_message) + self.update_healthcheck(success_message) + + + def check_missed_runs(self): + """Check if we missed any runs and execute them if needed""" + today = datetime.now().date() + last_run = self.get_last_run_date() + + if last_run is None: + logger.info("No record of previous runs. Will run at next scheduled time.") + return + + if last_run < today - timedelta(days=1): + missed_days = (today - last_run).days - 1 + logger.warning(f"Detected {missed_days} missed runs. Last run was on {last_run}.") + + if self.slack_notifier: + message = ( + f"Detected {missed_days} missed oracle runs. " + f"Last successful run was on {last_run}. " + "Attempting to execute missed run for yesterday." + ) + self.slack_notifier.send_info_notification( + message=message, + title="Missed Runs Detected", + ) + + yesterday = today - timedelta(days=1) + logger.info(f"Executing missed run for {yesterday}") + # The run_oracle method is decorated with @retry, so it will handle its own retries. + self.run_oracle(run_date_override=yesterday) + + + def initialize(self): + """Initialize the scheduler and validate configuration""" + logger.info("Initializing scheduler...") + try: + validate_all_required_env_vars() + + credential_manager.setup_google_credentials() + config = load_config() + + self.slack_notifier = create_slack_notifier(config.get("SLACK_WEBHOOK_URL")) + if self.slack_notifier: + logger.info("Slack notifications enabled for scheduler") + startup_message = ( + f"Service Quality Oracle scheduler started successfully.\n" + f"**Scheduled time:** {config['SCHEDULED_RUN_TIME']} UTC\n" + f"**Environment:** {os.environ.get('ENVIRONMENT', 'unknown')}" + ) + self.slack_notifier.send_info_notification( + message=startup_message, + title="Scheduler Started", + ) else: - logger.info("No missed runs to execute") - return config - except Exception as e: - logger.error(f"Failed to initialize scheduler: {e}", exc_info=True) - - # Try to send failure notification even if initialization failed - if slack_notifier: - slack_notifier.send_failure_notification( - error_message=str(e), stage="Scheduler Initialization", execution_time=0 - ) + logger.info("Slack notifications disabled for scheduler") - sys.exit(1) + pytz.timezone("UTC") + run_time = config["SCHEDULED_RUN_TIME"] + logger.info(f"Scheduling daily run at {run_time} UTC") + schedule.every().day.at(run_time).do(self.run_oracle, run_date_override=None) + self.update_healthcheck("Scheduler initialized") -if __name__ == "__main__": - # Initialize the scheduler - config = initialize() - logger.info("Scheduler started and waiting for scheduled runs") - - # Main loop - try: - while True: - schedule.run_pending() - # Update healthcheck file periodically (every 30 seconds) - if datetime.now().second % 30 == 0: - update_healthcheck("Scheduler heartbeat") - - # Sleep - time.sleep(15) - - except KeyboardInterrupt: - logger.info("Scheduler stopped by user") - - if slack_notifier: - slack_notifier.send_info_notification( - message="Scheduler stopped by user interrupt", title="Scheduler Stopped" - ) + if os.environ.get("RUN_ON_STARTUP", "false").lower() == "true": + logger.info("RUN_ON_STARTUP=true, executing oracle immediately") + self.run_oracle() + else: + # Check for missed runs + logger.info("Checking for missed runs...") + self.check_missed_runs() - except Exception as e: - logger.error(f"Scheduler crashed: {e}", exc_info=True) + return config - # Send failure notification to slack - if slack_notifier: - slack_notifier.send_failure_notification( - error_message=str(e), stage="Scheduler Runtime", execution_time=0 - ) + except Exception as e: + logger.error(f"Failed to initialize scheduler: {e}", exc_info=True) + if self.slack_notifier: + self.slack_notifier.send_failure_notification( + error_message=str(e), stage="Scheduler Initialization", execution_time=0 + ) + sys.exit(1) + + + def run(self): + """Main loop for the scheduler""" + logger.info("Scheduler started and waiting for scheduled runs") + try: + while True: + schedule.run_pending() + self.update_healthcheck("Scheduler heartbeat") + time.sleep(60) + + except KeyboardInterrupt: + logger.info("Scheduler stopped by user") + if self.slack_notifier: + self.slack_notifier.send_info_notification( + message="Scheduler stopped by user interrupt", title="Scheduler Stopped" + ) - # Exit the scheduler - sys.exit(1) + except Exception as e: + logger.error(f"Scheduler crashed: {e}", exc_info=True) + if self.slack_notifier: + self.slack_notifier.send_failure_notification( + error_message=str(e), stage="Scheduler Runtime", execution_time=0 + ) + sys.exit(1) + + +if __name__ == "__main__": + scheduler = Scheduler() + scheduler.run() diff --git a/src/models/service_quality_oracle.py b/src/models/service_quality_oracle.py new file mode 100644 index 0000000..51573f8 --- /dev/null +++ b/src/models/service_quality_oracle.py @@ -0,0 +1,146 @@ +""" +Service Quality Oracle's core module for fetching & processing data. +This module serves as the entry point for the oracle functionality, responsible for: +1. Fetching eligibility data from BigQuery +2. Processing indexer data to determine eligibility +3. Submitting eligible indexers to the blockchain contract +4. Sending Slack notifications about run status +""" + +import logging +import sys +import time +from datetime import date, timedelta +from pathlib import Path + +# Add project root to path +project_root_path = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(project_root_path)) + +# Import data access utilities with absolute import +from src.models.bigquery_data_access_provider import BigQueryProvider +from src.models.blockchain_client import BlockchainClient +from src.models.eligibility_pipeline import EligibilityPipeline +from src.utils.configuration import credential_manager, load_config +from src.utils.slack_notifier import create_slack_notifier + +# Set up basic logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def main(run_date_override: date = None): + """ + Main entry point for the Service Quality Oracle. + This function: + 1. Sets up Google credentials (if not already set up by scheduler) + 2. Fetches and processes indexer eligibility data + 3. Submits eligible indexers to the blockchain + 4. Sends Slack notifications about the run status + + Args: + run_date_override: If provided, use this date for the run instead of today. + """ + start_time = time.time() + slack_notifier = None + stage = "Initialization" + + try: + # Configuration and credentials + credential_manager.setup_google_credentials() + config = load_config() + slack_notifier = create_slack_notifier(config.get("SLACK_WEBHOOK_URL")) + if slack_notifier: + logger.info("Slack notifications enabled") + else: + logger.info("Slack notifications disabled (no webhook URL configured)") + + # Define the date for the current run + current_run_date = run_date_override or date.today() + start_date = current_run_date - timedelta(days=config["BIGQUERY_ANALYSIS_PERIOD_DAYS"]) + end_date = current_run_date + + # --- Data Fetching Stage --- + stage = "Data Fetching from BigQuery" + logger.info(f"Fetching data from {start_date} to {end_date}") + + # Construct the full table name from configuration + table_name = ( + f"{config['BIGQUERY_PROJECT_ID']}.{config['BIGQUERY_DATASET_ID']}.{config['BIGQUERY_TABLE_ID']}" + ) + + bigquery_provider = BigQueryProvider( + project=config["BIGQUERY_PROJECT_ID"], + location=config["BIGQUERY_LOCATION_ID"], + table_name=table_name, + min_online_days=config["MIN_ONLINE_DAYS"], + min_subgraphs=config["MIN_SUBGRAPHS"], + max_latency_ms=config["MAX_LATENCY_MS"], + max_blocks_behind=config["MAX_BLOCKS_BEHIND"], + ) + eligibility_data = bigquery_provider.fetch_indexer_issuance_eligibility_data(start_date, end_date) + logger.info(f"Successfully fetched data for {len(eligibility_data)} indexers from BigQuery.") + + # --- Data Processing Stage --- + stage = "Data Processing and Artifact Generation" + pipeline = EligibilityPipeline(project_root=project_root_path) + eligible_indexers, _ = pipeline.process( + input_data_from_bigquery=eligibility_data, + current_date=current_run_date, + ) + logger.info(f"Found {len(eligible_indexers)} eligible indexers.") + + pipeline.clean_old_date_directories(config["MAX_AGE_BEFORE_DELETION"]) + + # --- Blockchain Submission Stage --- + stage = "Blockchain Submission" + logger.info("Instantiating BlockchainClient...") + blockchain_client = BlockchainClient( + rpc_providers=config["BLOCKCHAIN_RPC_URLS"], + contract_address=config["BLOCKCHAIN_CONTRACT_ADDRESS"], + project_root=project_root_path, + block_explorer_url=config["BLOCK_EXPLORER_URL"], + tx_timeout_seconds=config["TX_TIMEOUT_SECONDS"], + ) + transaction_links = blockchain_client.batch_allow_indexers_issuance_eligibility( + indexer_addresses=eligible_indexers, + private_key=config["PRIVATE_KEY"], + chain_id=config["BLOCKCHAIN_CHAIN_ID"], + contract_function=config["BLOCKCHAIN_FUNCTION_NAME"], + batch_size=config["BATCH_SIZE"], + replace=True, + ) + + # Calculate execution time and send success notification + execution_time = time.time() - start_time + logger.info(f"Oracle run completed successfully in {execution_time:.2f} seconds") + + if slack_notifier: + batch_count = len(transaction_links) if transaction_links else 0 + total_processed = len(eligible_indexers) + slack_notifier.send_success_notification( + eligible_indexers=eligible_indexers, + total_processed=total_processed, + execution_time=execution_time, + transaction_links=transaction_links, + batch_count=batch_count, + ) + + except Exception as e: + execution_time = time.time() - start_time + error_msg = f"Oracle failed at stage '{stage}': {str(e)}" + logger.error(error_msg, exc_info=True) + + if slack_notifier: + try: + slack_notifier.send_failure_notification( + error_message=str(e), stage=stage, execution_time=execution_time + ) + except Exception as slack_e: + logger.error(f"Failed to send Slack failure notification: {slack_e}", exc_info=True) + + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/models/subgraph_data_access_provider.py b/src/models/subgraph_data_access_provider.py index fe344be..6a60ae8 100644 --- a/src/models/subgraph_data_access_provider.py +++ b/src/models/subgraph_data_access_provider.py @@ -30,23 +30,31 @@ def __init__(self): Initialize the subgraph provider. Automatically loads configuration from config loader. """ - # Import here to avoid circular imports - from src.utils.config_loader import load_config + from src.utils.configuration import load_config # Load configuration config = load_config() + # Get subgraph URL and API key from config self.subgraph_url = config.get("subgraph_url") self.api_key = config.get("studio_api_key") - # Validate configuration + + # If the subgraph URL is not set, raise an error if not self.subgraph_url: raise ValueError("SUBGRAPH_URL_PRODUCTION not set in configuration") + + # Log the initialized subgraph provider logger.info(f"Initialized SubgraphProvider with endpoint: {self.subgraph_url}") + + # If the API key is set, log a message if self.api_key: logger.info("API key loaded for subgraph queries") + + # If the API key is not set, log a warning else: logger.warning("No API key found, subgraph queries may be limited") + def fetch_all_indexers(self) -> list[dict[str, Any]]: """ Fetch all indexers that have been input into the subgraph. @@ -68,6 +76,7 @@ def fetch_all_indexers(self) -> list[dict[str, Any]]: logger.info(f"Fetched {len(all_indexers)} total indexers from subgraph") return all_indexers + def get_indexer_eligibility_statuses(self, first: int = 1000, skip: int = 0) -> list[dict[str, Any]]: """ Get eligibility statuses for all indexers. @@ -96,6 +105,7 @@ def get_indexer_eligibility_statuses(self, first: int = 1000, skip: int = 0) -> logger.error(f"Unexpected response format: {result}") return [] + def execute_query(self, query: str, variables: Optional[dict[str, Any]] = None) -> dict[str, Any]: """ Execute a GraphQL query against the subgraph. diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py deleted file mode 100644 index d8920f0..0000000 --- a/src/utils/config_loader.py +++ /dev/null @@ -1,266 +0,0 @@ -""" -Configuration Loader for Service Quality Oracle - -This module implements TOML + environment variables: -- Config is defined in TOML -- Sensitive values are loaded from environment variables - -Benefits: -- Clear separation between structure and sensitive data -- Production-ready for Docker -- Environment variable substitution with $VARIABLE_NAME syntax -""" - -import logging -import os -import re -import sys -from pathlib import Path -from typing import Any, Optional - -# Handle Python version compatibility for TOML loading -if sys.version_info >= (3, 11): - import tomllib -else: - import tomli as tomllib - -logger = logging.getLogger(__name__) - - -class ConfigurationError(Exception): - """Raised when configuration loading fails.""" - - pass - - -class ConfigLoader: - """Configuration loader with environment variable substitution""" - - def __init__(self, config_path: Optional[str] = None): - """Initialize the config loader""" - self.config_path = config_path or self._get_default_config_path() - self._env_var_pattern = re.compile(r"\$([A-Z_][A-Z0-9_]*)") - - def _get_default_config_path(self) -> str: - """Get the default configuration template path.""" - # Check if we're in a Docker container - docker_path = Path("/app/config.toml") - if docker_path.exists(): - return str(docker_path) - - # For local development, look in project root - current_path = Path(__file__).parent - while current_path != current_path.parent: - config_path = current_path / "config.toml" - if config_path.exists(): - return str(config_path) - current_path = current_path.parent - - raise ConfigurationError("Could not find config.toml in project root or Docker container") - - # TODO: check this... - def _substitute_env_vars(self, config_toml: Any) -> Any: - """ - Recursively substitute environment variables in the config. - - Supports $VARIABLE_NAME syntax for environment variable substitution. - - Args: - config_toml: config file to process - - Returns: - Processed config with environment variables substituted - - Raises: - ConfigurationError: If required environment variable is missing - """ - if isinstance(config_toml, str): - # Find all environment variable references - env_vars = self._env_var_pattern.findall(config_toml) - - for env_var in env_vars: - env_value = os.getenv(env_var) - if env_value is None: - raise ConfigurationError(f"Required environment variable {env_var} is not set") - - # Replace the environment variable reference with actual value - config_toml = config_toml.replace(f"${env_var}", env_value) - - return config_toml - - elif isinstance(config_toml, dict): - return {k: self._substitute_env_vars(v) for k, v in config_toml.items()} - - elif isinstance(config_toml, list): - return [self._substitute_env_vars(item) for item in config_toml] - - else: - return config_toml - - def load_config(self) -> dict[str, Any]: - """ - Load configuration from config.toml and substitute environment variables. - - Returns: - Dictionary containing the complete configuration with secrets loaded - from environment variables - - Raises: - ConfigurationError: If config file is missing or env vars are missing - """ - try: - # Load the TOML configuration - with open(self.config_path, "rb") as f: - config = tomllib.load(f) - - logger.info(f"Loaded configuration from: {self.config_path}") - - except FileNotFoundError: - raise ConfigurationError(f"Configuration not found: {self.config_path}") from None - except Exception as e: - raise ConfigurationError(f"Failed to parse configuration: {e}") from e - - try: - # Substitute environment variables throughout the configuration - config = self._substitute_env_vars(config) - - logger.info("Successfully loaded configuration with environment variables") - return config - - except ConfigurationError: - raise - except Exception as e: - raise ConfigurationError(f"Failed to substitute environment variables: {e}") from e - - def validate_required_env_vars(self) -> None: - """ - Validate that all required environment variables are set without loading full config. - - This can be used for early validation in startup scripts. - - Raises: - ConfigurationError: If any required environment variables are missing - """ - # Load the config file - try: - with open(self.config_path, "rb") as f: - config = tomllib.load(f) - - # If there is an error, raise a ConfigurationError - except Exception as e: - raise ConfigurationError(f"Cannot validate env vars - config error: {e}") from e - - # Collect all missing environment variables from config object - missing_vars = self._collect_missing_env_vars(config) - - # If there are missing variables, raise a ConfigurationError - if missing_vars: - raise ConfigurationError( - f"Missing required environment variables: {', '.join(sorted(set(missing_vars)))}" - ) - - def _collect_missing_env_vars(self, obj: Any) -> list[str]: - """ - Collect all missing environment variables from config object. - - Args: - obj: config object to collect missing environment variables from - - Returns: - list of missing environment variables (if any) - """ - missing_vars = [] - # Collect the missing enviroment vaiables using the appropriate speicifc method - if isinstance(obj, str): - env_vars = self._env_var_pattern.findall(obj) - for var in env_vars: - if os.getenv(var) is None: - missing_vars.append(var) - elif isinstance(obj, dict): - for value in obj.values(): - missing_vars.extend(self._collect_missing_env_vars(value)) - elif isinstance(obj, list): - for item in obj: - missing_vars.extend(self._collect_missing_env_vars(item)) - - # After all the missing variables have been collected, return the list - return missing_vars - - def get_flat_config(self) -> dict[str, Any]: - """ - Get configuration in flat format. - - Returns: - Flat dictionary with all configuration values - """ - config = self.load_config() - - # Convert nested structure to flat format - flat_config = { - # BigQuery settings - "bigquery_location": config.get("bigquery", {}).get("BIGQUERY_LOCATION_ID", "US"), - "bigquery_project_id": config.get("bigquery", {}).get("BIGQUERY_PROJECT_ID", "graph-mainnet"), - "bigquery_dataset_id": config.get("bigquery", {}).get("BIGQUERY_DATASET_ID", "internal_metrics"), - # Blockchain settings - "contract_address": config.get("blockchain", {}).get("BLOCKCHAIN_CONTRACT_ADDRESS"), - "contract_function": config.get("blockchain", {}).get("BLOCKCHAIN_FUNCTION_NAME"), - "chain_id": config.get("blockchain", {}).get("BLOCKCHAIN_CHAIN_ID"), - "rpc_providers": self._parse_rpc_urls(config.get("blockchain", {}).get("BLOCKCHAIN_RPC_URLS", [])), - # Scheduling - "scheduled_run_time": config.get("scheduling", {}).get("SCHEDULED_RUN_TIME"), - # Subgraph URLs - "subgraph_url": config.get("subgraph", {}).get("SUBGRAPH_URL_PRODUCTION"), - # Processing settings - "batch_size": config.get("processing", {}).get("BATCH_SIZE", 125), - "max_age_before_deletion": config.get("processing", {}).get("MAX_AGE_BEFORE_DELETION", 120), - # Secrets - "google_application_credentials": config.get("secrets", {}).get("GOOGLE_APPLICATION_CREDENTIALS"), - "private_key": config.get("secrets", {}).get("BLOCKCHAIN_PRIVATE_KEY"), - "studio_api_key": config.get("secrets", {}).get("STUDIO_API_KEY"), - "slack_webhook_url": config.get("secrets", {}).get("SLACK_WEBHOOK_URL"), - } - - return flat_config - - def _parse_rpc_urls(self, rpc_urls: list) -> list[str]: - """Parse RPC URLs from list format.""" - if not rpc_urls: - raise ConfigurationError("BLOCKCHAIN_RPC_URLS is required") - - if not isinstance(rpc_urls, list) or not all(isinstance(url, str) for url in rpc_urls): - raise ConfigurationError("RPC URLs must be a list of strings") - - valid_providers = [url.strip() for url in rpc_urls if url.strip()] - if not valid_providers: - raise ConfigurationError("No valid RPC providers found") - - return valid_providers - - -# Convenience function for easy integration with existing code -def load_config() -> dict[str, Any]: - """ - Convenience function to load configuration. - - Returns configuration in flat format compatible with existing codebase. - - Returns: - Dictionary containing configuration with secrets from environment variables - - Raises: - ConfigurationError: If configuration loading fails - """ - loader = ConfigLoader() - return loader.get_flat_config() - - -# For startup validation -def validate_all_required_env_vars() -> None: - """ - Validate that all required environment variables are set. - - Raises: - ConfigurationError: If any required environment variables are missing - """ - loader = ConfigLoader() - loader.validate_required_env_vars() diff --git a/src/utils/configuration.py b/src/utils/configuration.py new file mode 100644 index 0000000..2a1ad07 --- /dev/null +++ b/src/utils/configuration.py @@ -0,0 +1,429 @@ +""" +Centralized configuration and credential management for the Service Quality Oracle. +""" + +import json +import logging +import os +import re +import sys +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +# Handle Python version compatibility for TOML loading +if sys.version_info >= (3, 11): + import tomllib +else: + import tomli as tomllib + +logger = logging.getLogger(__name__) + + +class ConfigurationError(Exception): + """Raised when configuration loading or validation fails.""" + + pass + + +# --- Configuration Loading --- + + +class ConfigLoader: + """Internal class to load configuration from TOML and environment variables.""" + + def __init__(self, config_path: Optional[str] = None): + """Initialize the config loader""" + self.config_path = config_path or self._get_default_config_path() + self._env_var_pattern = re.compile(r"\$([A-Z_][A-Z0-9_]*)") + + + def _get_default_config_path(self) -> str: + """Get the default configuration template path.""" + # Check if we're in a Docker container + docker_path = Path("/app/config.toml") + if docker_path.exists(): + return str(docker_path) + + # For local development, look in project root + current_path = Path(__file__).parent + while current_path != current_path.parent: + config_path = current_path / "config.toml" + if config_path.exists(): + return str(config_path) + current_path = current_path.parent + + raise ConfigurationError("Could not find config.toml in project root or Docker container") + + + def _substitute_env_vars(self, config_toml: Any) -> Any: + """ + Recursively substitute environment variables in the config. + + Supports $VARIABLE_NAME syntax for environment variable substitution. + + Args: + config_toml: config file to process + + Returns: + Processed config with environment variables substituted + + Raises: + ConfigurationError: If required environment variable is missing + """ + if isinstance(config_toml, str): + # Find all environment variable references + env_vars = self._env_var_pattern.findall(config_toml) + + for env_var in env_vars: + env_value = os.getenv(env_var) + if env_value is None: + raise ConfigurationError(f"Required environment variable {env_var} is not set") + + # Replace the environment variable reference with actual value + config_toml = config_toml.replace(f"${env_var}", env_value) + + return config_toml + + elif isinstance(config_toml, dict): + return {k: self._substitute_env_vars(v) for k, v in config_toml.items()} + + elif isinstance(config_toml, list): + return [self._substitute_env_vars(item) for item in config_toml] + + return config_toml + + + def _get_raw_config(self) -> dict: + """ + Get raw configuration from TOML file. + + Returns: + toml file as a dictionary + """ + try: + with open(self.config_path, "rb") as f: + return tomllib.load(f) + + except FileNotFoundError as e: + raise ConfigurationError(f"Configuration not found: {self.config_path}") from e + + except Exception as e: + raise ConfigurationError(f"Failed to parse configuration: {e}") from e + + + def get_flat_config(self) -> dict[str, Any]: + """ + Get configuration in flat format. + + Returns: + Flat dictionary with all configuration values + """ + raw_config = self._get_raw_config() + substituted_config = self._substitute_env_vars(raw_config) + + # Helper to safely convert values to integers + + + def to_int(v): + return int(v) if v is not None and v != "" else None + + # fmt: off + # Convert nested structure to flat format + return { + # BigQuery settings + "BIGQUERY_LOCATION_ID": substituted_config.get("bigquery", {}).get("BIGQUERY_LOCATION_ID"), + "BIGQUERY_PROJECT_ID": substituted_config.get("bigquery", {}).get("BIGQUERY_PROJECT_ID"), + "BIGQUERY_DATASET_ID": substituted_config.get("bigquery", {}).get("BIGQUERY_DATASET_ID"), + "BIGQUERY_TABLE_ID": substituted_config.get("bigquery", {}).get("BIGQUERY_TABLE_ID"), + + # Eligibility Criteria + "MIN_ONLINE_DAYS": to_int(substituted_config.get("eligibility_criteria", {}).get("MIN_ONLINE_DAYS")), + "MIN_SUBGRAPHS": to_int(substituted_config.get("eligibility_criteria", {}).get("MIN_SUBGRAPHS")), + "MAX_LATENCY_MS": to_int(substituted_config.get("eligibility_criteria", {}).get("MAX_LATENCY_MS")), + "MAX_BLOCKS_BEHIND": to_int(substituted_config.get("eligibility_criteria", {}).get("MAX_BLOCKS_BEHIND")), + + # Blockchain settings + "BLOCKCHAIN_CONTRACT_ADDRESS": substituted_config.get("blockchain", {}).get("BLOCKCHAIN_CONTRACT_ADDRESS"), + "BLOCKCHAIN_FUNCTION_NAME": substituted_config.get("blockchain", {}).get("BLOCKCHAIN_FUNCTION_NAME"), + "BLOCKCHAIN_CHAIN_ID": to_int(substituted_config.get("blockchain", {}).get("BLOCKCHAIN_CHAIN_ID")), + "BLOCKCHAIN_RPC_URLS": self._parse_rpc_urls(substituted_config.get("blockchain", {}).get("BLOCKCHAIN_RPC_URLS")), + "BLOCK_EXPLORER_URL": substituted_config.get("blockchain", {}).get("BLOCK_EXPLORER_URL"), + "TX_TIMEOUT_SECONDS": to_int(substituted_config.get("blockchain", {}).get("TX_TIMEOUT_SECONDS")), + + # Scheduling + "SCHEDULED_RUN_TIME": substituted_config.get("scheduling", {}).get("SCHEDULED_RUN_TIME"), + + # Subgraph URLs + "SUBGRAPH_URL_PRE_PRODUCTION": substituted_config.get("subgraph", {}).get("SUBGRAPH_URL_PRE_PRODUCTION"), + "SUBGRAPH_URL_PRODUCTION": substituted_config.get("subgraph", {}).get("SUBGRAPH_URL_PRODUCTION"), + + # Processing settings + "BATCH_SIZE": to_int(substituted_config.get("processing", {}).get("BATCH_SIZE")), + "MAX_AGE_BEFORE_DELETION": to_int(substituted_config.get("processing", {}).get("MAX_AGE_BEFORE_DELETION")), + "BIGQUERY_ANALYSIS_PERIOD_DAYS": to_int(substituted_config.get("processing", {}).get("BIGQUERY_ANALYSIS_PERIOD_DAYS")), + + # Secrets + "GOOGLE_APPLICATION_CREDENTIALS": substituted_config.get("secrets", {}).get("GOOGLE_APPLICATION_CREDENTIALS"), + "PRIVATE_KEY": substituted_config.get("secrets", {}).get("BLOCKCHAIN_PRIVATE_KEY"), + "STUDIO_API_KEY": substituted_config.get("secrets", {}).get("STUDIO_API_KEY"), + "STUDIO_DEPLOY_KEY": substituted_config.get("secrets", {}).get("STUDIO_DEPLOY_KEY"), + "SLACK_WEBHOOK_URL": substituted_config.get("secrets", {}).get("SLACK_WEBHOOK_URL"), + "ETHERSCAN_API_KEY": substituted_config.get("secrets", {}).get("ETHERSCAN_API_KEY"), + "ARBITRUM_API_KEY": substituted_config.get("secrets", {}).get("ARBITRUM_API_KEY"), + } + # fmt: on + + + def _parse_rpc_urls(self, rpc_urls: Optional[list]) -> list[str]: + """Parse RPC URLs from list format.""" + if not rpc_urls or not isinstance(rpc_urls, list) or not all(isinstance(url, str) for url in rpc_urls): + return [] + + valid_providers = [url.strip() for url in rpc_urls if url.strip()] + if not valid_providers: + return [] + + return valid_providers + + + def _collect_missing_env_vars(self, obj: Any) -> list[str]: + """ + Collect all missing environment variables from config object. + + Args: + obj: config object to collect missing environment variables from + + Returns: + list of missing environment variables (if any) + """ + missing = [] + # Collect the missing enviroment vaiables using the appropriate speicifc method + if isinstance(obj, str): + env_vars = self._env_var_pattern.findall(obj) + for var in env_vars: + if os.getenv(var) is None: + missing.append(var) + + elif isinstance(obj, dict): + for value in obj.values(): + missing.extend(self._collect_missing_env_vars(value)) + + elif isinstance(obj, list): + for item in obj: + missing.extend(self._collect_missing_env_vars(item)) + + # After all the missing variables have been collected, return the list + return missing + + + def get_missing_env_vars(self) -> list[str]: + raw_config = self._get_raw_config() + return self._collect_missing_env_vars(raw_config) + + +def _validate_config(config: dict[str, Any]) -> dict[str, Any]: + # Define required fields. All other fields from `get_flat_config` are considered optional. + required = [ + "BIGQUERY_LOCATION_ID", + "BIGQUERY_PROJECT_ID", + "BIGQUERY_DATASET_ID", + "BIGQUERY_TABLE_ID", + "MIN_ONLINE_DAYS", + "MIN_SUBGRAPHS", + "MAX_LATENCY_MS", + "MAX_BLOCKS_BEHIND", + "BLOCKCHAIN_CONTRACT_ADDRESS", + "BLOCKCHAIN_FUNCTION_NAME", + "BLOCKCHAIN_CHAIN_ID", + "BLOCKCHAIN_RPC_URLS", + "BLOCK_EXPLORER_URL", + "TX_TIMEOUT_SECONDS", + "SCHEDULED_RUN_TIME", + "SUBGRAPH_URL_PRE_PRODUCTION", + "SUBGRAPH_URL_PRODUCTION", + "BATCH_SIZE", + "MAX_AGE_BEFORE_DELETION", + "BIGQUERY_ANALYSIS_PERIOD_DAYS", + "PRIVATE_KEY", + "STUDIO_API_KEY", + "STUDIO_DEPLOY_KEY", + "SLACK_WEBHOOK_URL", + "ETHERSCAN_API_KEY", + "ARBITRUM_API_KEY", + ] + missing = [field for field in required if not config.get(field)] + if missing: + raise ConfigurationError( + f"Missing required configuration fields in config.toml or environment variables: {', '.join(sorted(missing))}" + ) + + # Validate specific field formats + try: + # The int() casts in get_flat_config will handle type errors for numeric fields. + datetime.strptime(config["SCHEDULED_RUN_TIME"], "%H:%M") + except (ValueError, TypeError): + raise ConfigurationError( + f"Invalid SCHEDULED_RUN_TIME: {config['SCHEDULED_RUN_TIME']} - must be in HH:MM format." + ) + + return config + + +def load_config() -> dict[str, Any]: + """Loads, validates, and returns the application configuration.""" + loader = ConfigLoader() + flat_config = loader.get_flat_config() + logger.info("Successfully loaded configuration") + return _validate_config(flat_config) + + +def validate_all_required_env_vars() -> None: + """Validates that all required environment variables are set.""" + loader = ConfigLoader() + missing = loader.get_missing_env_vars() + if missing: + raise ConfigurationError(f"Missing required environment variables: {', '.join(sorted(set(missing)))}") + logger.info("Successfully validated all required environment variables") + + +# --- Credential Management --- + + +class CredentialManager: + """Handles credential management for Google Cloud services.""" + + + def _parse_and_validate_credentials_json(self, creds_env: str) -> dict: + """ + Parse and validate Google credentials JSON from environment variable. + + Args: + creds_env: JSON string containing credentials + + Returns: + dict: Parsed and validated credentials data + + Raises: + ValueError: If JSON is invalid or credentials are incomplete + """ + # Try to parse the credentials + try: + # Parse the credentials + creds_data = json.loads(creds_env) + cred_type = creds_data.get("type", "") + + # Validate the credentials data based on the type + if cred_type == "authorized_user": + required = ["client_id", "client_secret", "refresh_token"] + if not all(k in creds_data for k in required): + raise ValueError("Incomplete authorized_user credentials") + + elif cred_type == "service_account": + required = ["private_key", "client_email", "project_id"] + if not all(k in creds_data for k in required): + raise ValueError("Incomplete service_account credentials") + + else: + raise ValueError(f"Unsupported credential type: '{cred_type}'") + + return creds_data + + except Exception as e: + raise ValueError(f"Invalid credentials JSON: {e}") from e + + + def _setup_user_credentials_from_dict(self, creds_data: dict) -> None: + """Set up user account credentials directly from a dictionary.""" + import google.auth + from google.oauth2.credentials import Credentials + + # Try to set up the credentials + try: + credentials = Credentials( + token=None, + refresh_token=creds_data.get("refresh_token"), + client_id=creds_data.get("client_id"), + client_secret=creds_data.get("client_secret"), + token_uri="https://oauth2.googleapis.com/token", + ) + + # Set credentials globally for GCP libraries + google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined] + logger.info("Successfully loaded user account credentials from environment variable") + + # Clear credentials from memory + finally: + if "creds_data" in locals(): + creds_data.clear() + + + def _setup_service_account_credentials_from_dict(self, creds_data: dict) -> None: + """Set up service account credentials directly from a dictionary.""" + import google.auth + from google.oauth2 import service_account + + # Try to set up the credentials + try: + # Create credentials object directly from dict + credentials = service_account.Credentials.from_service_account_info(creds_data) + + # Set credentials globally for GCP libraries + google.auth._default._CREDENTIALS = credentials # type: ignore[attr-defined] + logger.info("Successfully loaded service account credentials from environment variable") + + # If the credentials creation fails, raise an error + except Exception as e: + raise ValueError(f"Invalid service account credentials: {e}") from e + + # Clear the original credentials dict from memory if it exists + finally: + if "creds_data" in locals(): + creds_data.clear() + + + def setup_google_credentials(self) -> None: + """ + Set up Google credentials directly in memory from environment variable. + This function handles multiple credential formats securely: + 1. JSON string in GOOGLE_APPLICATION_CREDENTIALS (inline credentials) + 2. File path in GOOGLE_APPLICATION_CREDENTIALS + """ + # Get the account credentials from the environment variable + creds_env = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + + # If the credentials are not set, log a warning and return + if not creds_env: + logger.warning("GOOGLE_APPLICATION_CREDENTIALS not set. Falling back to gcloud CLI.") + return + + # Case 1: JSON credentials provided inline + if creds_env.strip().startswith("{"): + creds_data = None + try: + # Parse and validate the credentials + creds_data = self._parse_and_validate_credentials_json(creds_env) + + # Set up the credentials based on the type + if creds_data.get("type") == "authorized_user": + self._setup_user_credentials_from_dict(creds_data.copy()) + else: + self._setup_service_account_credentials_from_dict(creds_data.copy()) + + # If the credentials parsing fails, raise an error + except Exception as e: + raise ValueError(f"Error processing inline credentials: {e}") from e + + # Clear the credentials from memory + finally: + if creds_data: + creds_data.clear() + + # Case 2: File path provided + elif not os.path.exists(creds_env): + logger.warning("GOOGLE_APPLICATION_CREDENTIALS is not valid JSON or a file path.") + logger.warning("Falling back to gcloud CLI authentication if available.") + + +# Global instance for easy access +credential_manager = CredentialManager() diff --git a/src/utils/key_validator.py b/src/utils/key_validator.py index 69a1e04..a5a097b 100644 --- a/src/utils/key_validator.py +++ b/src/utils/key_validator.py @@ -6,8 +6,6 @@ import logging import re -from dataclasses import dataclass -from typing import Optional logger = logging.getLogger(__name__) @@ -18,34 +16,22 @@ class KeyValidationError(Exception): pass -@dataclass -class KeyValidationResult: - """Result of private key validation.""" - - is_valid: bool - formatted_key: Optional[str] - error_message: Optional[str] - - -def validate_private_key(private_key: str) -> KeyValidationResult: +def validate_and_format_private_key(private_key: str) -> str: """ - Validate and format a private key. + Validate and format a private key, raising an exception if invalid. + Ensures the key is a 64-character hex string and adds the '0x' prefix. Args: private_key: Raw private key string Returns: - KeyValidationResult object with validation status, formatted key, and error message + Formatted private key string Raises: KeyValidationError: If key validation fails """ if not private_key or not isinstance(private_key, str): - return KeyValidationResult( - is_valid=False, - formatted_key=None, - error_message="Private key must be a non-empty string", - ) + raise KeyValidationError("Private key must be a non-empty string") # Remove whitespace and common prefixes clean_key = private_key.strip() @@ -58,35 +44,7 @@ def validate_private_key(private_key: str) -> KeyValidationResult: # Validate hex format (64 characters) if not re.match(r"^[0-9a-fA-F]{64}$", hex_key): - return KeyValidationResult( - is_valid=False, - formatted_key=None, - error_message="Private key must be 64 hex characters", - ) + raise KeyValidationError("Private key must be 64 hex characters") # Return formatted key with 0x prefix - formatted_key = f"0x{hex_key.lower()}" - return KeyValidationResult( - is_valid=True, - formatted_key=formatted_key, - error_message=None, - ) - - -def validate_and_format_private_key(private_key: str) -> str: - """ - Validate and format a private key, raising an exception if invalid. - - Args: - private_key: Raw private key string - - Returns: - Formatted private key string - - Raises: - KeyValidationError: If key validation fails - """ - result = validate_private_key(private_key) - if not result.is_valid: - raise KeyValidationError(f"Invalid private key: {result.error_message}") - return result.formatted_key + return f"0x{hex_key.lower()}" diff --git a/src/utils/retry_decorator.py b/src/utils/retry_decorator.py new file mode 100644 index 0000000..a3eff8f --- /dev/null +++ b/src/utils/retry_decorator.py @@ -0,0 +1,60 @@ +""" +Standardized retry decorator with consistent backoff strategy across the application. +""" + +import logging +from functools import wraps +from typing import Any, Callable, Type, Union + +from tenacity import ( + before_sleep_log, + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential, +) + +logger = logging.getLogger(__name__) + + +# fmt: off +def retry_with_backoff( + max_attempts: int = 5, + min_wait: int = 1, + max_wait: int = 120, + multiplier: int = 2, + exceptions: Union[Type[Exception], tuple[Type[Exception], ...]] = Exception, + reraise: bool = True, +) -> Callable: + """ + Retry decorator with exponential backoff. + + Args: + max_attempts: Maximum number of retry attempts (default: 5) + min_wait: Minimum wait time between retries in seconds (default: 1) + max_wait: Maximum wait time between retries in seconds (default: 120) + multiplier: Exponential backoff multiplier (default: 2) + exceptions: Exception types to retry on (default: Exception) + reraise: Whether to reraise the exception after all attempts fail (default: True) + + Returns: + Decorated function with retry logic + """ + def decorator(func: Callable) -> Callable: + """Retry decorator with exponential backoff.""" + @retry( + retry=retry_if_exception_type(exceptions), + stop=stop_after_attempt(max_attempts), + wait=wait_exponential(multiplier=multiplier, min=min_wait, max=max_wait), + before_sleep=before_sleep_log(logger, logging.WARNING), + reraise=reraise, + ) + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + return func(*args, **kwargs) + + return wrapper + + + return decorator +# fmt: on diff --git a/src/utils/slack_notifier.py b/src/utils/slack_notifier.py index 8839255..20921b1 100644 --- a/src/utils/slack_notifier.py +++ b/src/utils/slack_notifier.py @@ -7,7 +7,9 @@ from datetime import datetime from typing import Dict, List, Optional -import requests +import requests # type: ignore[import-untyped] + +from src.utils.retry_decorator import retry_with_backoff # Module-level logger logger = logging.getLogger(__name__) @@ -25,8 +27,11 @@ def __init__(self, webhook_url: str) -> None: """ self.webhook_url = webhook_url self.timeout = 10 # seconds - self.max_attempts = 8 + + @retry_with_backoff( + max_attempts=8, min_wait=1, max_wait=128, exceptions=(requests.exceptions.RequestException,) + ) def _send_message(self, payload: Dict) -> bool: """ Send a message to Slack via webhook with exponential backoff retry. @@ -37,50 +42,36 @@ def _send_message(self, payload: Dict) -> bool: Returns: bool: True if message was sent successfully, False otherwise """ - import time - # Get the message type from the payload message_type = payload.get("text", "Unknown") # Log the message type logger.info(f"Sending Slack notification: {message_type}") - # Attempt to send the message 3 times with exponential backoff - for attempt in range(self.max_attempts): - try: - response = requests.post( - self.webhook_url, - json=payload, - timeout=self.timeout, - headers={"Content-Type": "application/json"}, - ) - - # If the message is sent successfully, return True - if response.status_code == 200: - logger.info("Slack notification sent successfully") - return True - + try: + response = requests.post( + self.webhook_url, + json=payload, + timeout=self.timeout, + headers={"Content-Type": "application/json"}, + ) + + # If the message is sent successfully, return True + if response.status_code == 200: + logger.info("Slack notification sent successfully") + return True + else: # log message failure - else: - logger.warning(f"Slack notification failed: {response.status_code}") + logger.warning(f"Slack notification failed: {response.status_code}") + # Raise an exception to trigger retry + response.raise_for_status() + return False - # If there is an error when trying to send the message, log the error - except requests.exceptions.RequestException as e: - logger.warning(f"Slack notification attempt {attempt + 1} failed: {str(e)}") + # If there is an error when trying to send the message, log the error and re-raise + except requests.exceptions.RequestException as e: + logger.warning(f"Slack notification failed: {str(e)}") + raise - # If the last attempt fails, log an error - if attempt == self.max_attempts - 1: - logger.error("All Slack notification attempts failed") - - # If the attempt is not the last, wait for the exponential backoff and retry - else: - # Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s - wait_time = 2**attempt - logger.info(f"Retrying in {wait_time} seconds...") - time.sleep(wait_time) - - # If the message is not sent successfully, return False - return False def _create_payload(self, text: str, fields: List[Dict], color: str = "good") -> Dict: """Create a Slack message payload.""" @@ -96,6 +87,7 @@ def _create_payload(self, text: str, fields: List[Dict], color: str = "good") -> ], } + def send_success_notification( self, eligible_indexers: List[str], @@ -146,6 +138,7 @@ def send_success_notification( # Send message payload to Slack return self._send_message(payload) + def send_failure_notification( self, error_message: str, @@ -199,6 +192,7 @@ def send_failure_notification( # Send message payload to Slack return self._send_message(payload) + def send_info_notification(self, message: str, title: str = "Info") -> bool: """ Send an informational notification to Slack. diff --git a/tests/placeholder.py b/tests/placeholder.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_blockchain_client.py b/tests/test_blockchain_client.py new file mode 100644 index 0000000..d5c021e --- /dev/null +++ b/tests/test_blockchain_client.py @@ -0,0 +1,8 @@ +""" +Unit tests for the BlockchainClient. +""" + +# TODO: Add tests for RPC failover +# TODO: Add tests for gas estimation +# TODO: Add tests for nonce management +# TODO: Add tests for transaction batching diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 0000000..275cc26 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,8 @@ +""" +Unit tests for the configuration loader and validator. +""" + +# TODO: Add test for successful config loading +# TODO: Add test for missing required config value (should raise ConfigurationError) +# TODO: Add test for invalid config value (e.g., bad time format) +# TODO: Add test for environment variable substitution diff --git a/tests/test_eligibility_pipeline.py b/tests/test_eligibility_pipeline.py new file mode 100644 index 0000000..ccaab92 --- /dev/null +++ b/tests/test_eligibility_pipeline.py @@ -0,0 +1,8 @@ +""" +Unit tests for the EligibilityPipeline. +""" + +# TODO: Add test for data processing logic +# TODO: Add test for CSV artifact generation +# TODO: Add test for date directory cleanup +# TODO: Add test for dataframe validation diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..826138a --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,7 @@ +""" +Unit tests for the Scheduler. +""" + +# TODO: Add test for scheduled job execution +# TODO: Add test for missed run detection and execution +# TODO: Add test for healthcheck file updates diff --git a/tests/test_service_quality_oracle.py b/tests/test_service_quality_oracle.py new file mode 100644 index 0000000..287bc84 --- /dev/null +++ b/tests/test_service_quality_oracle.py @@ -0,0 +1,7 @@ +""" +Unit tests for the main ServiceQualityOracle orchestrator. +""" + +# TODO: Add end-to-end test with mocked dependencies +# TODO: Add test for successful run notification +# TODO: Add test for failure run notification