Skip to content

Commit ef5d789

Browse files
authored
eth,eth/watcher: Create Chainlink price feed watcher (#2972)
* eth/watchers: Create PriceFeed watcher Makefile: Use mockgen binary from tool dependencies eth/contracts: Add chainlink interfaces source Makefile: Generate Chainlink contracts ABI tools: Add abigen tool to repo eth/contracts: Generate chainlink bindings Makefile: Fix abigen bindings generation Revert everything abigen Turns out there's already bindings exported from the Chainlink lib. go.mod: Add chainlink library eth/watchers: Add pricefeed watcher eth/watchers: Clean-up event watching code eth/watchers: Improve price tracking Revert "go.mod: Add chainlink library" This reverts commit ac415bd. Revert "Revert everything abigen" This reverts commit b7c40b1. eth/contracts: Gen bindings for proxy iface eth/watchers: Use local bindings for contracts eth/watchers: Simplify event subs logic eth/watchers: Simplify&optimize truncated ticker eth/watchers: Update decimals on fetch eth/watchers: Improve handling of decimals eth/watchers: Fix price rat creation eth/watchers: Make sure we use UTC on truncated timer eth/contracts/chainlink: Generate only V3 contract bindings eth/watchers: Watch PriceFeed only with polling eth/watchers: Add a retry logic on price update eth/watchers: Use clog instead of fmt.Printf * eth: Create separate pricefeed client unit This will make the code more testable. * eth: Add tests for pricefeed client * eth/watchers: Add tests to the truncated ticker Gosh that was much harder than I thought * eth/watchers: Add tests for pricefeedwatcher * eth: Add comments to the new components * go fmt * eth: Address minor review comments * eth,eth/watchers: Improve pricefeed watcher interface * eth/watchers: Remove truncated ticker tests
1 parent 6c09a9f commit ef5d789

File tree

11 files changed

+1240
-6
lines changed

11 files changed

+1240
-6
lines changed

Makefile

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
SHELL=/bin/bash
22
GO_BUILD_DIR?="./"
33

4-
all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go livepeer livepeer_cli livepeer_router livepeer_bench
4+
MOCKGEN=go run github.com/golang/mock/mockgen
5+
ABIGEN=go run github.com/ethereum/go-ethereum/cmd/abigen
6+
7+
all: net/lp_rpc.pb.go net/redeemer.pb.go net/redeemer_mock.pb.go core/test_segment.go eth/contracts/chainlink/AggregatorV3Interface.go livepeer livepeer_cli livepeer_router livepeer_bench
58

69
net/lp_rpc.pb.go: net/lp_rpc.proto
710
protoc -I=. --go_out=. --go-grpc_out=. $^
@@ -10,12 +13,21 @@ net/redeemer.pb.go: net/redeemer.proto
1013
protoc -I=. --go_out=. --go-grpc_out=. $^
1114

1215
net/redeemer_mock.pb.go net/redeemer_grpc_mock.pb.go: net/redeemer.pb.go net/redeemer_grpc.pb.go
13-
@mockgen -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
14-
@mockgen -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net
16+
@$(MOCKGEN) -source net/redeemer.pb.go -destination net/redeemer_mock.pb.go -package net
17+
@$(MOCKGEN) -source net/redeemer_grpc.pb.go -destination net/redeemer_grpc_mock.pb.go -package net
1518

1619
core/test_segment.go:
1720
core/test_segment.sh core/test_segment.go
1821

22+
eth/contracts/chainlink/AggregatorV3Interface.go:
23+
solc --version | grep 0.7.6+commit.7338295f
24+
@set -ex; \
25+
for sol_file in eth/contracts/chainlink/*.sol; do \
26+
contract_name=$$(basename "$$sol_file" .sol); \
27+
solc --abi --optimize --overwrite -o $$(dirname "$$sol_file") $$sol_file; \
28+
$(ABIGEN) --abi=$${sol_file%.sol}.abi --pkg=chainlink --type=$$contract_name --out=$${sol_file%.sol}.go; \
29+
done
30+
1931
version=$(shell cat VERSION)
2032

2133
ldflags := -X github.com/livepeer/go-livepeer/core.LivepeerVersion=$(shell ./print_version.sh)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"description","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"latestRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"version","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]

eth/contracts/chainlink/AggregatorV3Interface.go

Lines changed: 394 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// SPDX-License-Identifier: MIT
2+
// https://github.com/smartcontractkit/chainlink/blob/v2.9.1/contracts/src/v0.7/interfaces/AggregatorV3Interface.sol
3+
pragma solidity ^0.7.0;
4+
5+
interface AggregatorV3Interface {
6+
function decimals() external view returns (uint8);
7+
8+
function description() external view returns (string memory);
9+
10+
function version() external view returns (uint256);
11+
12+
// getRoundData and latestRoundData should both raise "No data present"
13+
// if they do not have data to report, instead of returning unset values
14+
// which could be misinterpreted as actual reported values.
15+
function getRoundData(uint80 _roundId)
16+
external
17+
view
18+
returns (
19+
uint80 roundId,
20+
int256 answer,
21+
uint256 startedAt,
22+
uint256 updatedAt,
23+
uint80 answeredInRound
24+
);
25+
26+
function latestRoundData()
27+
external
28+
view
29+
returns (
30+
uint80 roundId,
31+
int256 answer,
32+
uint256 startedAt,
33+
uint256 updatedAt,
34+
uint80 answeredInRound
35+
);
36+
}

eth/pricefeed.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package eth
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"math/big"
7+
"time"
8+
9+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
10+
"github.com/ethereum/go-ethereum/common"
11+
"github.com/ethereum/go-ethereum/ethclient"
12+
"github.com/livepeer/go-livepeer/eth/contracts/chainlink"
13+
)
14+
15+
type PriceData struct {
16+
RoundID int64
17+
Price *big.Rat
18+
UpdatedAt time.Time
19+
}
20+
21+
// PriceFeedEthClient is an interface for fetching price data from a Chainlink
22+
// PriceFeed contract.
23+
type PriceFeedEthClient interface {
24+
Description() (string, error)
25+
FetchPriceData() (PriceData, error)
26+
}
27+
28+
func NewPriceFeedEthClient(ethClient *ethclient.Client, priceFeedAddr string) (PriceFeedEthClient, error) {
29+
addr := common.HexToAddress(priceFeedAddr)
30+
priceFeed, err := chainlink.NewAggregatorV3Interface(addr, ethClient)
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to create aggregator proxy: %w", err)
33+
}
34+
35+
return &priceFeedClient{
36+
client: ethClient,
37+
priceFeed: priceFeed,
38+
}, nil
39+
}
40+
41+
type priceFeedClient struct {
42+
client *ethclient.Client
43+
priceFeed *chainlink.AggregatorV3Interface
44+
}
45+
46+
func (c *priceFeedClient) Description() (string, error) {
47+
return c.priceFeed.Description(&bind.CallOpts{})
48+
}
49+
50+
func (c *priceFeedClient) FetchPriceData() (PriceData, error) {
51+
data, err := c.priceFeed.LatestRoundData(&bind.CallOpts{})
52+
if err != nil {
53+
return PriceData{}, errors.New("failed to get latest round data: " + err.Error())
54+
}
55+
56+
decimals, err := c.priceFeed.Decimals(&bind.CallOpts{})
57+
if err != nil {
58+
return PriceData{}, errors.New("failed to get decimals: " + err.Error())
59+
}
60+
61+
return computePriceData(data.RoundId, data.UpdatedAt, data.Answer, decimals), nil
62+
}
63+
64+
// computePriceData transforms the raw data from the PriceFeed into the higher
65+
// level PriceData struct, more easily usable by the rest of the system.
66+
func computePriceData(roundID, updatedAt, answer *big.Int, decimals uint8) PriceData {
67+
// Compute a big.int which is 10^decimals.
68+
divisor := new(big.Int).Exp(
69+
big.NewInt(10),
70+
big.NewInt(int64(decimals)),
71+
nil)
72+
73+
return PriceData{
74+
RoundID: roundID.Int64(),
75+
Price: new(big.Rat).SetFrac(answer, divisor),
76+
UpdatedAt: time.Unix(updatedAt.Int64(), 0),
77+
}
78+
}

eth/pricefeed_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package eth
2+
3+
import (
4+
"math/big"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestComputePriceData(t *testing.T) {
11+
assert := assert.New(t)
12+
13+
t.Run("valid data", func(t *testing.T) {
14+
roundID := big.NewInt(1)
15+
updatedAt := big.NewInt(1626192000)
16+
answer := big.NewInt(420666000)
17+
decimals := uint8(6)
18+
19+
data := computePriceData(roundID, updatedAt, answer, decimals)
20+
21+
assert.EqualValues(int64(1), data.RoundID, "Round ID didn't match")
22+
assert.Equal("210333/500", data.Price.RatString(), "The Price Rat didn't match")
23+
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
24+
})
25+
26+
t.Run("zero answer", func(t *testing.T) {
27+
roundID := big.NewInt(2)
28+
updatedAt := big.NewInt(1626192000)
29+
answer := big.NewInt(0)
30+
decimals := uint8(18)
31+
32+
data := computePriceData(roundID, updatedAt, answer, decimals)
33+
34+
assert.EqualValues(int64(2), data.RoundID, "Round ID didn't match")
35+
assert.Equal("0", data.Price.RatString(), "The Price Rat didn't match")
36+
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
37+
})
38+
39+
t.Run("zero decimals", func(t *testing.T) {
40+
roundID := big.NewInt(3)
41+
updatedAt := big.NewInt(1626192000)
42+
answer := big.NewInt(13)
43+
decimals := uint8(0)
44+
45+
data := computePriceData(roundID, updatedAt, answer, decimals)
46+
47+
assert.EqualValues(int64(3), data.RoundID, "Round ID didn't match")
48+
assert.Equal("13", data.Price.RatString(), "The Price Rat didn't match")
49+
assert.Equal("2021-07-13 16:00:00 +0000 UTC", data.UpdatedAt.UTC().String(), "The updated at time did not match")
50+
})
51+
}

eth/watchers/pricefeedwatcher.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package watchers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/ethclient"
11+
"github.com/ethereum/go-ethereum/event"
12+
"github.com/livepeer/go-livepeer/clog"
13+
"github.com/livepeer/go-livepeer/eth"
14+
)
15+
16+
const (
17+
priceUpdateMaxRetries = 5
18+
priceUpdateBaseRetryDelay = 30 * time.Second
19+
priceUpdatePeriod = 1 * time.Hour
20+
)
21+
22+
// PriceFeedWatcher monitors a Chainlink PriceFeed for updated pricing info. It
23+
// allows fetching the current price as well as listening for updates on the
24+
// PriceUpdated channel.
25+
type PriceFeedWatcher struct {
26+
baseRetryDelay time.Duration
27+
28+
priceFeed eth.PriceFeedEthClient
29+
currencyBase, currencyQuote string
30+
31+
mu sync.RWMutex
32+
current eth.PriceData
33+
priceEventFeed event.Feed
34+
}
35+
36+
// NewPriceFeedWatcher creates a new PriceFeedWatcher instance. It will already
37+
// fetch the current price and start a goroutine to watch for updates.
38+
func NewPriceFeedWatcher(ethClient *ethclient.Client, priceFeedAddr string) (*PriceFeedWatcher, error) {
39+
priceFeed, err := eth.NewPriceFeedEthClient(ethClient, priceFeedAddr)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to create price feed client: %w", err)
42+
}
43+
44+
description, err := priceFeed.Description()
45+
if err != nil {
46+
return nil, fmt.Errorf("failed to get description: %w", err)
47+
}
48+
49+
currencyFrom, currencyTo, err := parseCurrencies(description)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
w := &PriceFeedWatcher{
55+
baseRetryDelay: priceUpdateBaseRetryDelay,
56+
priceFeed: priceFeed,
57+
currencyBase: currencyFrom,
58+
currencyQuote: currencyTo,
59+
}
60+
61+
err = w.updatePrice()
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to update price: %w", err)
64+
}
65+
66+
return w, nil
67+
}
68+
69+
// Currencies returns the base and quote currencies of the price feed.
70+
// i.e. base = CurrentPrice() * quote
71+
func (w *PriceFeedWatcher) Currencies() (base string, quote string) {
72+
return w.currencyBase, w.currencyQuote
73+
}
74+
75+
// Current returns the latest fetched price data.
76+
func (w *PriceFeedWatcher) Current() eth.PriceData {
77+
w.mu.RLock()
78+
defer w.mu.RUnlock()
79+
return w.current
80+
}
81+
82+
// Subscribe allows one to subscribe to price updates emitted by the Watcher.
83+
// To unsubscribe, simply call `Unsubscribe` on the returned subscription.
84+
// The sink channel should have ample buffer space to avoid blocking other
85+
// subscribers. Slow subscribers are not dropped.
86+
func (w *PriceFeedWatcher) Subscribe(sub chan<- eth.PriceData) event.Subscription {
87+
return w.priceEventFeed.Subscribe(sub)
88+
}
89+
90+
func (w *PriceFeedWatcher) updatePrice() error {
91+
newPrice, err := w.priceFeed.FetchPriceData()
92+
if err != nil {
93+
return fmt.Errorf("failed to fetch price data: %w", err)
94+
}
95+
96+
if newPrice.UpdatedAt.After(w.current.UpdatedAt) {
97+
w.mu.Lock()
98+
w.current = newPrice
99+
w.mu.Unlock()
100+
w.priceEventFeed.Send(newPrice)
101+
}
102+
103+
return nil
104+
}
105+
106+
// Watch starts the watch process. It will periodically poll the price feed for
107+
// price updates until the given context is canceled. Typically, you want to
108+
// call Watch inside a goroutine.
109+
func (w *PriceFeedWatcher) Watch(ctx context.Context) {
110+
ticker := newTruncatedTicker(ctx, priceUpdatePeriod)
111+
w.watchTicker(ctx, ticker)
112+
}
113+
114+
func (w *PriceFeedWatcher) watchTicker(ctx context.Context, ticker <-chan time.Time) {
115+
for {
116+
select {
117+
case <-ctx.Done():
118+
return
119+
case <-ticker:
120+
attempt, retryDelay := 1, w.baseRetryDelay
121+
for {
122+
err := w.updatePrice()
123+
if err == nil {
124+
break
125+
} else if attempt >= priceUpdateMaxRetries {
126+
clog.Errorf(ctx, "Failed to fetch updated price from PriceFeed attempts=%d err=%q", attempt, err)
127+
break
128+
}
129+
130+
clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%q", retryDelay, attempt, err)
131+
select {
132+
case <-ctx.Done():
133+
return
134+
case <-time.After(retryDelay):
135+
}
136+
attempt, retryDelay = attempt+1, retryDelay*2
137+
}
138+
}
139+
}
140+
}
141+
142+
// parseCurrencies parses the base and quote currencies from a price feed based
143+
// on Chainlink PriceFeed description pattern "FROM / TO".
144+
func parseCurrencies(description string) (currencyBase string, currencyQuote string, err error) {
145+
currencies := strings.Split(description, "/")
146+
if len(currencies) != 2 {
147+
return "", "", fmt.Errorf("aggregator description must be in the format 'FROM / TO' but got: %s", description)
148+
}
149+
150+
currencyBase = strings.TrimSpace(currencies[0])
151+
currencyQuote = strings.TrimSpace(currencies[1])
152+
return
153+
}
154+
155+
// newTruncatedTicker creates a ticker that ticks at the next time that is a
156+
// multiple of d, starting from the current time. This is a best-effort approach
157+
// to ensure that nodes update their prices around the same time to avoid too
158+
// big price discrepancies.
159+
func newTruncatedTicker(ctx context.Context, d time.Duration) <-chan time.Time {
160+
ch := make(chan time.Time, 1)
161+
go func() {
162+
defer close(ch)
163+
164+
nextTick := time.Now().UTC().Truncate(d)
165+
for {
166+
nextTick = nextTick.Add(d)
167+
untilNextTick := nextTick.Sub(time.Now().UTC())
168+
if untilNextTick <= 0 {
169+
continue
170+
}
171+
172+
select {
173+
case <-ctx.Done():
174+
return
175+
case t := <-time.After(untilNextTick):
176+
ch <- t
177+
}
178+
}
179+
}()
180+
181+
return ch
182+
}

0 commit comments

Comments
 (0)