Skip to content

Commit 16fe38e

Browse files
authored
Run oracle for independently for every network (#86)
Signed-off-by: cyc60 <[email protected]>
1 parent f9043b4 commit 16fe38e

File tree

7 files changed

+80
-50
lines changed

7 files changed

+80
-50
lines changed

oracle/oracle/clients.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,6 @@ class GraphqlConsensusError(ConnectionError):
3030
pass
3131

3232

33-
def with_consensus(f):
34-
def wrapper(*args, **kwargs):
35-
try:
36-
return f(*args, **kwargs)
37-
except GraphqlConsensusError as e:
38-
logger.error(f"There is no consensus in GraphQL query: {e}")
39-
return
40-
41-
return wrapper
42-
43-
4433
async def execute_single_gql_query(
4534
subgraph_url: str, query: DocumentNode, variables: Dict
4635
):

oracle/oracle/distributor/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from web3 import Web3
77

88
from oracle.networks import NETWORKS
9-
from oracle.oracle.clients import with_consensus
9+
from oracle.oracle.utils import save
1010
from oracle.settings import DISTRIBUTOR_VOTE_FILENAME
1111

1212
from ..eth1 import submit_vote
@@ -43,7 +43,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None:
4343
"REWARD_TOKEN_CONTRACT_ADDRESS"
4444
]
4545

46-
@with_consensus
46+
@save
4747
async def process(self, voting_params: DistributorVotingParameters) -> None:
4848
"""Submits vote for the new merkle root and merkle proofs to the IPFS."""
4949
from_block = voting_params["from_block"]

oracle/oracle/eth1.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,7 @@
1111
from web3.types import BlockNumber, Timestamp, Wei
1212

1313
from oracle.networks import NETWORKS
14-
from oracle.oracle.clients import (
15-
execute_single_gql_query,
16-
execute_sw_gql_query,
17-
with_consensus,
18-
)
14+
from oracle.oracle.clients import execute_single_gql_query, execute_sw_gql_query
1915
from oracle.oracle.graphql_queries import (
2016
FINALIZED_BLOCK_QUERY,
2117
LATEST_BLOCK_QUERY,
@@ -98,7 +94,6 @@ async def has_synced_block(network: str, block_number: BlockNumber) -> bool:
9894
return block_number <= int(result["_meta"]["block"]["number"])
9995

10096

101-
@with_consensus
10297
async def get_voting_parameters(
10398
network: str, block_number: BlockNumber
10499
) -> VotingParameters:

oracle/oracle/main.py

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,42 @@
4545

4646
async def main() -> None:
4747
oracle_accounts: Dict[str, LocalAccount] = await get_oracle_accounts()
48+
# aiohttp session
49+
session = aiohttp.ClientSession()
50+
await init_checks(oracle_accounts, session)
51+
52+
# wait for interrupt
53+
interrupt_handler = InterruptHandler()
54+
55+
# fetch ETH2 genesis
56+
controllers = []
57+
for network in ENABLED_NETWORKS:
58+
genesis = await get_genesis(network, session)
59+
oracle = oracle_accounts[network]
60+
rewards_controller = RewardsController(
61+
network=network,
62+
aiohttp_session=session,
63+
genesis_timestamp=int(genesis["genesis_time"]),
64+
oracle=oracle,
65+
)
66+
distributor_controller = DistributorController(network, oracle)
67+
validators_controller = ValidatorsController(network, oracle)
68+
controllers.append(
69+
(
70+
interrupt_handler,
71+
network,
72+
rewards_controller,
73+
distributor_controller,
74+
validators_controller,
75+
)
76+
)
77+
78+
await asyncio.gather(*[process_network(*args) for args in controllers])
79+
80+
await session.close()
4881

82+
83+
async def init_checks(oracle_accounts, session):
4984
# try submitting test vote
5085
for network, oracle in oracle_accounts.items():
5186
logger.info(f"[{network}] Submitting test vote for account {oracle.address}...")
@@ -69,9 +104,6 @@ async def main() -> None:
69104
]
70105
logger.info(f"[{network}] Connected to graph nodes at {parsed_uris}")
71106

72-
# aiohttp session
73-
session = aiohttp.ClientSession()
74-
75107
# check ETH2 API connection
76108
for network in ENABLED_NETWORKS:
77109
network_config = NETWORKS[network]
@@ -82,28 +114,16 @@ async def main() -> None:
82114
)
83115
logger.info(f"[{network}] Connected to ETH2 node at {parsed_uri}")
84116

85-
# wait for interrupt
86-
interrupt_handler = InterruptHandler()
87-
88-
# fetch ETH2 genesis
89-
controllers = []
90-
for network in ENABLED_NETWORKS:
91-
genesis = await get_genesis(network, session)
92-
oracle = oracle_accounts[network]
93-
rewards_controller = RewardsController(
94-
network=network,
95-
aiohttp_session=session,
96-
genesis_timestamp=int(genesis["genesis_time"]),
97-
oracle=oracle,
98-
)
99-
distributor_controller = DistributorController(network, oracle)
100-
validators_controller = ValidatorsController(network, oracle)
101-
controllers.append(
102-
(network, rewards_controller, distributor_controller, validators_controller)
103-
)
104117

118+
async def process_network(
119+
interrupt_handler: InterruptHandler,
120+
network: str,
121+
rewards_ctrl: RewardsController,
122+
distributor_ctrl: DistributorController,
123+
validators_ctrl: ValidatorsController,
124+
) -> None:
105125
while not interrupt_handler.exit:
106-
for (network, rewards_ctrl, distributor_ctrl, validators_ctrl) in controllers:
126+
try:
107127
# fetch current finalized ETH1 block data
108128
finalized_block = await get_finalized_block(network)
109129
current_block_number = finalized_block["block_number"]
@@ -119,7 +139,7 @@ async def main() -> None:
119139
)
120140
# there is no consensus
121141
if not voting_parameters:
122-
continue
142+
return
123143

124144
await asyncio.gather(
125145
# check and update staking rewards
@@ -136,12 +156,11 @@ async def main() -> None:
136156
block_number=latest_block_number,
137157
),
138158
)
159+
except BaseException as e:
160+
logger.exception(e)
139161

140-
# wait until next processing time
141162
await asyncio.sleep(ORACLE_PROCESS_INTERVAL)
142163

143-
await session.close()
144-
145164

146165
if __name__ == "__main__":
147166
if ENABLE_HEALTH_SERVER:

oracle/oracle/rewards/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from web3.types import Timestamp, Wei
1111

1212
from oracle.networks import GNOSIS_CHAIN, NETWORKS
13-
from oracle.oracle.clients import with_consensus
1413
from oracle.oracle.eth1 import submit_vote
1514
from oracle.oracle.rewards.types import RewardsVotingParameters, RewardVote
15+
from oracle.oracle.utils import save
1616
from oracle.settings import MGNO_RATE, REWARD_VOTE_FILENAME, WAD
1717

1818
from .eth1 import get_registered_validators_public_keys
@@ -50,7 +50,7 @@ def __init__(
5050
self.deposit_token_symbol = NETWORKS[network]["DEPOSIT_TOKEN_SYMBOL"]
5151
self.last_vote_total_rewards = None
5252

53-
@with_consensus
53+
@save
5454
async def process(
5555
self,
5656
voting_params: RewardsVotingParameters,

oracle/oracle/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import asyncio
2+
import logging
3+
from functools import wraps
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
def save(func):
9+
if asyncio.iscoroutinefunction(func):
10+
11+
@wraps(func)
12+
async def wrapper(*args, **kwargs):
13+
try:
14+
return await func(*args, **kwargs)
15+
except BaseException as e:
16+
logger.exception(e)
17+
18+
else:
19+
20+
@wraps(func)
21+
def wrapper(*args, **kwargs):
22+
try:
23+
return func(*args, **kwargs)
24+
except BaseException as e:
25+
logger.exception(e)
26+
27+
return wrapper

oracle/oracle/validators/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from web3.types import Wei
88

99
from oracle.networks import GNOSIS_CHAIN, NETWORKS
10-
from oracle.oracle.clients import with_consensus
1110
from oracle.oracle.eth1 import submit_vote
11+
from oracle.oracle.utils import save
1212
from oracle.settings import MGNO_RATE, VALIDATOR_VOTE_FILENAME, WAD
1313

1414
from .eth1 import get_validators_deposit_root, select_validator
@@ -30,7 +30,7 @@ def __init__(self, network: str, oracle: LocalAccount) -> None:
3030
self.validators_batch_size = NETWORKS[self.network]["VALIDATORS_BATCH_SIZE"]
3131
self.last_validators_deposit_data = []
3232

33-
@with_consensus
33+
@save
3434
async def process(
3535
self,
3636
voting_params: ValidatorVotingParameters,

0 commit comments

Comments
 (0)