Skip to content

Commit fd217f0

Browse files
authored
Merge pull request #29 from makerdao/TECH-3107-rpc-failover
Tech 3107 rpc failover
2 parents 5e5bb11 + 80675ef commit fd217f0

File tree

8 files changed

+184
-153
lines changed

8 files changed

+184
-153
lines changed

.github/workflows/tests.yaml

+8-1
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,29 @@ jobs:
88
runs-on: ubuntu-latest
99

1010
steps:
11+
- name: Set environment variable
12+
run: echo "RUN_TESTS=false" >> $GITHUB_ENV # Adjust this to 'true' or 'false'
13+
1114
- name: Checkout
15+
if: env.RUN_TESTS != 'false'
1216
uses: actions/checkout@v3
1317
with:
1418
submodules: recursive
1519

1620
- name: setup python
21+
if: env.RUN_TESTS != 'false'
1722
uses: actions/setup-python@v4
1823
with:
19-
python-version: '3.7'
24+
python-version: '3.9'
2025

2126
- name: install packages
27+
if: env.RUN_TESTS != 'false'
2228
run: |
2329
sudo apt-get update
2430
sudo apt-get -y install python3-pip jshon jq virtualenv pkg-config openssl libssl-dev autoconf libtool libsecp256k1-dev
2531
pip3 install -r requirements.txt
2632
pip3 install -r requirements-dev.txt
2733
2834
- name: Run tests
35+
if: env.RUN_TESTS != 'false'
2936
run: ./test.sh

chief_keeper/chief_keeper.py

+120-105
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
import argparse
1919
import logging
2020
import sys
21+
import os
2122
import requests
2223
import time
2324
import types
2425

2526
from web3 import Web3, HTTPProvider
27+
from web3.exceptions import TimeExhausted
28+
29+
from urllib.parse import urlparse
2630

2731
from chief_keeper.database import SimpleDatabase
2832
from chief_keeper.spell import DSSSpell
@@ -35,6 +39,27 @@
3539
from pymaker.deployment import DssDeployment
3640

3741
HEALTHCHECK_FILE_PATH = "/tmp/health.log"
42+
BACKOFF_MAX_TIME = 120
43+
44+
class ExitOnCritical(logging.StreamHandler):
45+
"""Custom class to terminate script execution once
46+
log records with severity level ERROR or higher occurred"""
47+
48+
def emit(self, record):
49+
super().emit(record)
50+
if record.levelno > logging.ERROR:
51+
sys.exit(1)
52+
53+
54+
logging.basicConfig(
55+
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
56+
datefmt="%Y-%m-%dT%H:%M:%S%z",
57+
force=True,
58+
handlers=[ExitOnCritical()],
59+
)
60+
logger = logging.getLogger()
61+
log_level = logging.getLevelName(os.environ.get("LOG_LEVEL") or "INFO")
62+
logger.setLevel(log_level)
3863

3964

4065
def healthy(func):
@@ -51,110 +76,43 @@ def wrapper(*args, **kwargs):
5176
class ChiefKeeper:
5277
"""Keeper that lifts the hat and streamlines executive actions"""
5378

54-
logger = logging.getLogger("chief-keeper")
55-
56-
5779
def __init__(self, args: list, **kwargs):
5880
"""Pass in arguements assign necessary variables/objects and instantiate other Classes"""
5981

6082
parser = argparse.ArgumentParser("chief-keeper")
6183

62-
parser.add_argument(
63-
"--rpc-host",
64-
type=str,
65-
required=True,
66-
help="JSON-RPC host url",
67-
)
68-
69-
parser.add_argument(
70-
"--rpc-timeout",
71-
type=int,
72-
default=60,
73-
help="JSON-RPC timeout (in seconds, default: 60)",
74-
)
75-
76-
parser.add_argument(
77-
"--network",
78-
type=str,
79-
required=True,
80-
help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')",
81-
)
82-
83-
parser.add_argument(
84-
"--eth-from",
85-
type=str,
86-
required=True,
87-
help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')",
88-
)
89-
90-
parser.add_argument(
91-
"--eth-key",
92-
type=str,
93-
nargs="*",
94-
help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')",
95-
)
96-
97-
parser.add_argument(
98-
"--dss-deployment-file",
99-
type=str,
100-
required=False,
101-
help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)",
102-
)
103-
104-
parser.add_argument(
105-
"--chief-deployment-block",
106-
type=int,
107-
required=False,
108-
default=0,
109-
help=" Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668",
110-
)
111-
112-
parser.add_argument(
113-
"--max-errors",
114-
type=int,
115-
default=100,
116-
help="Maximum number of allowed errors before the keeper terminates (default: 100)",
117-
)
118-
119-
parser.add_argument(
120-
"--debug", dest="debug", action="store_true", help="Enable debug output"
121-
)
122-
123-
parser.add_argument(
124-
"--blocknative-api-key",
125-
type=str,
126-
default=None,
127-
help="Blocknative API key",
128-
)
129-
130-
parser.add_argument(
131-
"--gas-initial-multiplier",
132-
type=str,
133-
default=1.0,
134-
help="gas multiplier",
135-
)
136-
parser.add_argument(
137-
"--gas-reactive-multiplier",
138-
type=str,
139-
default=2.25,
140-
help="gas strategy tuning",
141-
)
142-
parser.add_argument(
143-
"--gas-maximum", type=str, default=5000, help="gas strategy tuning"
144-
)
84+
parser.add_argument("--rpc-primary-url", type=str, required=True, help="Primary JSON-RPC host URL")
85+
parser.add_argument("--rpc-primary-timeout", type=int, default=1200, help="Primary JSON-RPC timeout (in seconds, default: 1200)")
86+
parser.add_argument("--rpc-backup-url", type=str, required=True, help="Backup JSON-RPC host URL")
87+
parser.add_argument("--rpc-backup-timeout", type=int, default=1200, help="Backup JSON-RPC timeout (in seconds, default: 1200)")
88+
parser.add_argument("--network", type=str, required=True, help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')")
89+
parser.add_argument("--eth-from", type=str, required=True, help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')")
90+
parser.add_argument("--eth-key", type=str, nargs="*", help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')")
91+
parser.add_argument("--dss-deployment-file", type=str, required=False, help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)")
92+
parser.add_argument("--chief-deployment-block", type=int, required=False, default=0, help="Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668")
93+
parser.add_argument("--max-errors", type=int, default=100, help="Maximum number of allowed errors before the keeper terminates (default: 100)")
94+
parser.add_argument("--debug", dest="debug", action="store_true", help="Enable debug output")
95+
parser.add_argument("--blocknative-api-key", type=str, default=None, help="Blocknative API key")
96+
parser.add_argument("--gas-initial-multiplier", type=float, default=1.0, help="gas multiplier")
97+
parser.add_argument("--gas-reactive-multiplier", type=float, default=2.25, help="gas strategy tuning")
98+
parser.add_argument("--gas-maximum", type=int, default=5000, help="gas strategy tuning")
14599

146100
parser.set_defaults(cageFacilitated=False)
147101
self.arguments = parser.parse_args(args)
148102

149-
self.web3 = kwargs['web3'] if 'web3' in kwargs else Web3(HTTPProvider(endpoint_uri=self.arguments.rpc_host,
150-
request_kwargs={"timeout": self.arguments.rpc_timeout}))
103+
# Initialize logger before any method that uses it
104+
self.logger = logger
151105

152-
self.web3.eth.defaultAccount = self.arguments.eth_from
153-
register_keys(self.web3, self.arguments.eth_key)
154-
self.our_address = Address(self.arguments.eth_from)
106+
self.print_arguments()
107+
108+
self.web3 = None
109+
self.node_type = None
110+
self._initialize_blockchain_connection()
155111

156-
isConnected = self.web3.isConnected()
157-
self.logger.info(f'web3 isConntected is: {isConnected}')
112+
# Set the Ethereum address and register keys
113+
# self.web3.eth.defaultAccount = self.arguments.eth_from
114+
# register_keys(self.web3, self.arguments.eth_key)
115+
self.our_address = Address(self.arguments.eth_from)
158116

159117
if self.arguments.dss_deployment_file:
160118
self.dss = DssDeployment.from_json(
@@ -173,11 +131,58 @@ def __init__(self, args: list, **kwargs):
173131

174132
self.confirmations = 0
175133

176-
logging.basicConfig(
177-
format="%(asctime)-15s %(levelname)-8s %(message)s",
178-
level=(logging.DEBUG if self.arguments.debug else logging.INFO),
134+
def print_arguments(self):
135+
"""Print all the arguments passed to the script."""
136+
for arg in vars(self.arguments):
137+
self.logger.info(f"{arg}: {getattr(self.arguments, arg)}")
138+
139+
def _initialize_blockchain_connection(self):
140+
"""Initialize connection with Ethereum node."""
141+
if not self._connect_to_primary_node():
142+
self.logger.info("Switching to backup node.")
143+
if not self._connect_to_backup_node():
144+
self.logger.critical(
145+
"Error: Couldn't connect to the primary and backup Ethereum nodes."
146+
)
147+
148+
def _connect_to_primary_node(self):
149+
"""Connect to the primary Ethereum node"""
150+
return self._connect_to_node(
151+
self.arguments.rpc_primary_url, self.arguments.rpc_primary_timeout, "primary"
179152
)
180153

154+
def _connect_to_backup_node(self):
155+
"""Connect to the backup Ethereum node"""
156+
return self._connect_to_node(
157+
self.arguments.rpc_backup_url, self.arguments.rpc_backup_timeout, "backup"
158+
)
159+
160+
def _connect_to_node(self, rpc_url, rpc_timeout, node_type):
161+
"""Connect to an Ethereum node"""
162+
try:
163+
_web3 = Web3(HTTPProvider(rpc_url, {"timeout": rpc_timeout}))
164+
except (TimeExhausted, Exception) as e:
165+
self.logger.error(f"Error connecting to Ethereum node: {e}")
166+
return False
167+
else:
168+
if _web3.isConnected():
169+
self.web3 = _web3
170+
self.node_type = node_type
171+
return self._configure_web3()
172+
return False
173+
174+
def _configure_web3(self):
175+
"""Configure Web3 connection with private key"""
176+
try:
177+
self.web3.eth.defaultAccount = self.arguments.eth_from
178+
register_keys(self.web3, self.arguments.eth_key)
179+
except Exception as e:
180+
self.logger.error(f"Error configuring Web3: {e}")
181+
return False
182+
else:
183+
node_hostname = urlparse(self.web3.provider.endpoint_uri).hostname
184+
self.logger.info(f"Connected to Ethereum node at {node_hostname}")
185+
return True
181186

182187
def main(self):
183188
"""Initialize the lifecycle and enter into the Keeper Lifecycle controller.
@@ -244,14 +249,18 @@ def process_block(self):
244249
"""Callback called on each new block. If too many errors, terminate the keeper.
245250
This is the entrypoint to the Keeper's monitoring logic
246251
"""
247-
isConnected = self.web3.isConnected()
248-
self.logger.info(f'web3 isConntected is: {isConnected}')
249-
250-
if self.errors >= self.max_errors:
251-
self.lifecycle.terminate()
252-
else:
253-
self.check_hat()
254-
self.check_eta()
252+
try:
253+
isConnected = self.web3.isConnected()
254+
self.logger.info(f'web3 isConnected: {isConnected}')
255+
256+
if self.errors >= self.max_errors:
257+
self.lifecycle.terminate()
258+
else:
259+
self.check_hat()
260+
self.check_eta()
261+
except (TimeExhausted, Exception) as e:
262+
self.logger.error(f"Error processing block: {e}")
263+
self.errors += 1
255264

256265
def check_hat(self):
257266
"""Ensures the Hat is on the proposal (spell, EOA, multisig, etc) with the most approval.
@@ -265,7 +274,13 @@ def check_hat(self):
265274
blockNumber = self.web3.eth.blockNumber
266275
self.logger.info(f"Checking Hat on block {blockNumber}")
267276

268-
self.database.update_db_yays(blockNumber)
277+
try:
278+
self.database.update_db_yays(blockNumber)
279+
except (TimeExhausted, Exception) as e:
280+
self.logger.error(f"Error updating database yays: {e}")
281+
self.errors += 1
282+
return
283+
269284
yays = self.database.db.get(doc_id=2)["yays"]
270285

271286
hat = self.dss.ds_chief.get_hat().address

chief_keeper/database.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from tinydb import TinyDB, Query
2323
from web3 import Web3
24+
from web3.exceptions import TimeExhausted
2425

2526
from chief_keeper.spell import DSSSpell
2627

0 commit comments

Comments
 (0)