Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 36094ff

Browse files
author
Dongsu Park
committed
registry: use etcd.Config.HeaderTimeoutPerRequest instead of internal timeout
As etcd's keysAPI already supports HeaderTimeoutPerRequest, fleet should not handle request timeout on its own, but just pass on the timeout value to the etcd client. For that, set timeout only in etcd.Config.HeaderTimeoutPerRequest, and get rid of internal timeout handling from both EtcdRegistry and LeaseManager. Also remove EtcdRegistry.ctx(), and just call context.Background() directly, as suggested by Jon. Suggested-by: Jonathan Boulle <[email protected]> Partially resolves #1397
1 parent 8685cfb commit 36094ff

File tree

10 files changed

+53
-57
lines changed

10 files changed

+53
-57
lines changed

fleetctl/fleetctl.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,9 @@ func getRegistryClient() (client.API, error) {
462462
}
463463

464464
eCfg := etcd.Config{
465-
Endpoints: strings.Split(globalFlags.Endpoint, ","),
466-
Transport: trans,
465+
Endpoints: strings.Split(globalFlags.Endpoint, ","),
466+
Transport: trans,
467+
HeaderTimeoutPerRequest: getRequestTimeoutFlag(),
467468
}
468469

469470
eClient, err := etcd.New(eCfg)
@@ -472,7 +473,7 @@ func getRegistryClient() (client.API, error) {
472473
}
473474

474475
kAPI := etcd.NewKeysAPI(eClient)
475-
reg := registry.NewEtcdRegistry(kAPI, globalFlags.EtcdKeyPrefix, getRequestTimeoutFlag())
476+
reg := registry.NewEtcdRegistry(kAPI, globalFlags.EtcdKeyPrefix)
476477

477478
if msg, ok := checkVersion(reg); !ok {
478479
stderr(msg)

pkg/lease/etcd.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,16 @@ func serializeLeaseMetadata(machID string, ver int) (string, error) {
9797
}
9898

9999
type etcdLeaseManager struct {
100-
kAPI etcd.KeysAPI
101-
keyPrefix string
102-
reqTimeout time.Duration
100+
kAPI etcd.KeysAPI
101+
keyPrefix string
103102
}
104103

105-
func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string, reqTimeout time.Duration) *etcdLeaseManager {
106-
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix, reqTimeout: reqTimeout}
104+
func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string) *etcdLeaseManager {
105+
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix}
107106
}
108107

109108
func (r *etcdLeaseManager) ctx() context.Context {
110-
ctx, _ := context.WithTimeout(context.Background(), r.reqTimeout)
111-
return ctx
109+
return context.Background()
112110
}
113111

114112
func (r *etcdLeaseManager) leasePath(name string) string {

registry/etcd.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,23 @@ package registry
1616

1717
import (
1818
"path"
19-
"time"
2019

2120
etcd "github.com/coreos/etcd/client"
22-
"golang.org/x/net/context"
2321
)
2422

2523
const DefaultKeyPrefix = "/_coreos.com/fleet/"
2624

27-
func NewEtcdRegistry(kAPI etcd.KeysAPI, keyPrefix string, reqTimeout time.Duration) *EtcdRegistry {
25+
func NewEtcdRegistry(kAPI etcd.KeysAPI, keyPrefix string) *EtcdRegistry {
2826
return &EtcdRegistry{
29-
kAPI: kAPI,
30-
keyPrefix: keyPrefix,
31-
reqTimeout: reqTimeout,
27+
kAPI: kAPI,
28+
keyPrefix: keyPrefix,
3229
}
3330
}
3431

3532
// EtcdRegistry fulfils the Registry interface and uses etcd as a backend
3633
type EtcdRegistry struct {
37-
kAPI etcd.KeysAPI
38-
keyPrefix string
39-
reqTimeout time.Duration
40-
}
41-
42-
func (r *EtcdRegistry) ctx() context.Context {
43-
ctx, _ := context.WithTimeout(context.Background(), r.reqTimeout)
44-
return ctx
34+
kAPI etcd.KeysAPI
35+
keyPrefix string
4536
}
4637

4738
func (r *EtcdRegistry) prefixed(p ...string) string {

registry/job.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sort"
2222

2323
etcd "github.com/coreos/etcd/client"
24+
"golang.org/x/net/context"
2425

2526
"github.com/coreos/fleet/job"
2627
"github.com/coreos/fleet/log"
@@ -38,7 +39,7 @@ func (r *EtcdRegistry) Schedule() ([]job.ScheduledUnit, error) {
3839
Sort: true,
3940
Recursive: true,
4041
}
41-
res, err := r.kAPI.Get(r.ctx(), key, opts)
42+
res, err := r.kAPI.Get(context.Background(), key, opts)
4243
if err != nil {
4344
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
4445
err = nil
@@ -94,7 +95,7 @@ func (r *EtcdRegistry) Units() ([]job.Unit, error) {
9495
Sort: true,
9596
Recursive: true,
9697
}
97-
res, err := r.kAPI.Get(r.ctx(), key, opts)
98+
res, err := r.kAPI.Get(context.Background(), key, opts)
9899
if err != nil {
99100
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
100101
err = nil
@@ -142,7 +143,7 @@ func (r *EtcdRegistry) Unit(name string) (*job.Unit, error) {
142143
opts := &etcd.GetOptions{
143144
Recursive: true,
144145
}
145-
res, err := r.kAPI.Get(r.ctx(), key, opts)
146+
res, err := r.kAPI.Get(context.Background(), key, opts)
146147
if err != nil {
147148
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
148149
err = nil
@@ -192,7 +193,7 @@ func (r *EtcdRegistry) ScheduledUnit(name string) (*job.ScheduledUnit, error) {
192193
opts := &etcd.GetOptions{
193194
Recursive: true,
194195
}
195-
res, err := r.kAPI.Get(r.ctx(), key, opts)
196+
res, err := r.kAPI.Get(context.Background(), key, opts)
196197
if err != nil {
197198
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
198199
err = nil
@@ -224,7 +225,7 @@ func (r *EtcdRegistry) UnscheduleUnit(name, machID string) error {
224225
opts := &etcd.DeleteOptions{
225226
PrevValue: machID,
226227
}
227-
_, err := r.kAPI.Delete(r.ctx(), key, opts)
228+
_, err := r.kAPI.Delete(context.Background(), key, opts)
228229
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
229230
err = nil
230231
}
@@ -297,7 +298,7 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
297298
opts := &etcd.DeleteOptions{
298299
Recursive: true,
299300
}
300-
_, err := r.kAPI.Delete(r.ctx(), key, opts)
301+
_, err := r.kAPI.Delete(context.Background(), key, opts)
301302
if err != nil {
302303
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
303304
err = errors.New("job does not exist")
@@ -332,7 +333,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
332333
PrevExist: etcd.PrevIgnore,
333334
}
334335
key := r.prefixed(jobPrefix, u.Name, "object")
335-
_, err = r.kAPI.Set(r.ctx(), key, val, opts)
336+
_, err = r.kAPI.Set(context.Background(), key, val, opts)
336337
if err != nil {
337338
return
338339
}
@@ -342,7 +343,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
342343

343344
func (r *EtcdRegistry) SetUnitTargetState(name string, state job.JobState) error {
344345
key := r.jobTargetStatePath(name)
345-
_, err := r.kAPI.Set(r.ctx(), key, string(state), nil)
346+
_, err := r.kAPI.Set(context.Background(), key, string(state), nil)
346347
return err
347348
}
348349

@@ -351,7 +352,7 @@ func (r *EtcdRegistry) ScheduleUnit(name string, machID string) error {
351352
opts := &etcd.SetOptions{
352353
PrevExist: etcd.PrevNoExist,
353354
}
354-
_, err := r.kAPI.Set(r.ctx(), key, machID, opts)
355+
_, err := r.kAPI.Set(context.Background(), key, machID, opts)
355356
return err
356357
}
357358

registry/job_state.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
etcd "github.com/coreos/etcd/client"
21+
"golang.org/x/net/context"
2122

2223
"github.com/coreos/fleet/job"
2324
"github.com/coreos/fleet/unit"
@@ -51,13 +52,13 @@ func (r *EtcdRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err
5152
opts := &etcd.SetOptions{
5253
TTL: ttl,
5354
}
54-
_, err := r.kAPI.Set(r.ctx(), key, machID, opts)
55+
_, err := r.kAPI.Set(context.Background(), key, machID, opts)
5556
return err
5657
}
5758

5859
func (r *EtcdRegistry) ClearUnitHeartbeat(name string) {
5960
key := r.jobHeartbeatPath(name)
60-
r.kAPI.Delete(r.ctx(), key, nil)
61+
r.kAPI.Delete(context.Background(), key, nil)
6162
}
6263

6364
func (r *EtcdRegistry) jobHeartbeatPath(jobName string) string {

registry/machine.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
etcd "github.com/coreos/etcd/client"
22+
"golang.org/x/net/context"
2223

2324
"github.com/coreos/fleet/machine"
2425
)
@@ -34,7 +35,7 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) {
3435
Recursive: true,
3536
}
3637

37-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
38+
resp, err := r.kAPI.Get(context.Background(), key, opts)
3839
if err != nil {
3940
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
4041
err = nil
@@ -72,7 +73,7 @@ func (r *EtcdRegistry) CreateMachineState(ms machine.MachineState, ttl time.Dura
7273
PrevExist: etcd.PrevNoExist,
7374
TTL: ttl,
7475
}
75-
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
76+
resp, err := r.kAPI.Set(context.Background(), key, val, opts)
7677
if err != nil {
7778
return uint64(0), err
7879
}
@@ -91,7 +92,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
9192
PrevExist: etcd.PrevExist,
9293
TTL: ttl,
9394
}
94-
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
95+
resp, err := r.kAPI.Set(context.Background(), key, val, opts)
9596
if err == nil {
9697
return resp.Node.ModifiedIndex, nil
9798
}
@@ -100,7 +101,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
100101
// in the cluster know this is a new member
101102
opts.PrevExist = etcd.PrevNoExist
102103

103-
resp, err = r.kAPI.Set(r.ctx(), key, val, opts)
104+
resp, err = r.kAPI.Set(context.Background(), key, val, opts)
104105
if err != nil {
105106
return uint64(0), err
106107
}
@@ -110,7 +111,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
110111

111112
func (r *EtcdRegistry) RemoveMachineState(machID string) error {
112113
key := r.prefixed(machinePrefix, machID, "object")
113-
_, err := r.kAPI.Delete(r.ctx(), key, nil)
114+
_, err := r.kAPI.Delete(context.Background(), key, nil)
114115
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
115116
err = nil
116117
}

registry/unit.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
etcd "github.com/coreos/etcd/client"
22+
"golang.org/x/net/context"
2223

2324
"github.com/coreos/fleet/log"
2425
"github.com/coreos/fleet/metrics"
@@ -44,7 +45,7 @@ func (r *EtcdRegistry) storeOrGetUnitFile(u unit.UnitFile) (err error) {
4445
PrevExist: etcd.PrevNoExist,
4546
}
4647
start := time.Now()
47-
_, err = r.kAPI.Set(r.ctx(), key, val, opts)
48+
_, err = r.kAPI.Set(context.Background(), key, val, opts)
4849
// unit is already stored
4950
if isEtcdError(err, etcd.ErrorCodeNodeExist) {
5051
// TODO(jonboulle): verify more here?
@@ -65,7 +66,7 @@ func (r *EtcdRegistry) getUnitByHash(hash unit.Hash) *unit.UnitFile {
6566
Recursive: true,
6667
}
6768
start := time.Now()
68-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
69+
resp, err := r.kAPI.Get(context.Background(), key, opts)
6970
if err != nil {
7071
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
7172
err = nil
@@ -86,7 +87,7 @@ func (r *EtcdRegistry) getAllUnitsHashMap() (map[string]*unit.UnitFile, error) {
8687
}
8788
hashToUnit := map[string]*unit.UnitFile{}
8889
start := time.Now()
89-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
90+
resp, err := r.kAPI.Get(context.Background(), key, opts)
9091
if err != nil {
9192
metrics.ReportRegistryOpFailure(metrics.GetAll)
9293
return nil, err

registry/unit_state.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
etcd "github.com/coreos/etcd/client"
23+
"golang.org/x/net/context"
2324

2425
"github.com/coreos/fleet/log"
2526
"github.com/coreos/fleet/machine"
@@ -98,7 +99,7 @@ func (r *EtcdRegistry) statesByMUSKey() (map[MUSKey]*unit.UnitState, error) {
9899
opts := &etcd.GetOptions{
99100
Recursive: true,
100101
}
101-
res, err := r.kAPI.Get(r.ctx(), key, opts)
102+
res, err := r.kAPI.Get(context.Background(), key, opts)
102103
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
103104
return nil, err
104105
}
@@ -127,7 +128,7 @@ func (r *EtcdRegistry) statesByMUSKey() (map[MUSKey]*unit.UnitState, error) {
127128
// given unit that originates from the indicated machine
128129
func (r *EtcdRegistry) getUnitState(uName, machID string) (*unit.UnitState, error) {
129130
key := r.unitStatePath(machID, uName)
130-
res, err := r.kAPI.Get(r.ctx(), key, nil)
131+
res, err := r.kAPI.Get(context.Background(), key, nil)
131132
if err != nil {
132133
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
133134
err = nil
@@ -162,17 +163,17 @@ func (r *EtcdRegistry) SaveUnitState(jobName string, unitState *unit.UnitState,
162163
}
163164

164165
legacyKey := r.legacyUnitStatePath(jobName)
165-
r.kAPI.Set(r.ctx(), legacyKey, val, opts)
166+
r.kAPI.Set(context.Background(), legacyKey, val, opts)
166167

167168
newKey := r.unitStatePath(unitState.MachineID, jobName)
168-
r.kAPI.Set(r.ctx(), newKey, val, opts)
169+
r.kAPI.Set(context.Background(), newKey, val, opts)
169170
}
170171

171172
// Delete the state from the Registry for the given Job's Unit
172173
func (r *EtcdRegistry) RemoveUnitState(jobName string) error {
173174
// TODO(jonboulle): consider https://github.com/coreos/fleet/issues/465
174175
legacyKey := r.legacyUnitStatePath(jobName)
175-
_, err := r.kAPI.Delete(r.ctx(), legacyKey, nil)
176+
_, err := r.kAPI.Delete(context.Background(), legacyKey, nil)
176177
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
177178
return err
178179
}
@@ -182,7 +183,7 @@ func (r *EtcdRegistry) RemoveUnitState(jobName string) error {
182183
opts := &etcd.DeleteOptions{
183184
Recursive: true,
184185
}
185-
_, err = r.kAPI.Delete(r.ctx(), newKey, opts)
186+
_, err = r.kAPI.Delete(context.Background(), newKey, opts)
186187
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
187188
return err
188189
}

registry/version.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
etcd "github.com/coreos/etcd/client"
2121
"github.com/coreos/go-semver/semver"
22+
"golang.org/x/net/context"
2223
)
2324

2425
// LatestDaemonVersion attempts to retrieve the latest version of fleetd
@@ -46,7 +47,7 @@ func (r *EtcdRegistry) LatestDaemonVersion() (*semver.Version, error) {
4647

4748
// EngineVersion implements the ClusterRegistry interface
4849
func (r *EtcdRegistry) EngineVersion() (int, error) {
49-
res, err := r.kAPI.Get(r.ctx(), r.engineVersionPath(), nil)
50+
res, err := r.kAPI.Get(context.Background(), r.engineVersionPath(), nil)
5051
if err != nil {
5152
// no big deal, either the cluster is new or is just
5253
// upgrading from old unversioned code
@@ -69,7 +70,7 @@ func (r *EtcdRegistry) UpdateEngineVersion(from, to int) error {
6970
opts := &etcd.SetOptions{
7071
PrevValue: strFrom,
7172
}
72-
_, err := r.kAPI.Set(r.ctx(), key, strTo, opts)
73+
_, err := r.kAPI.Set(context.Background(), key, strTo, opts)
7374
if err == nil {
7475
return nil
7576
} else if !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
@@ -79,7 +80,7 @@ func (r *EtcdRegistry) UpdateEngineVersion(from, to int) error {
7980
opts = &etcd.SetOptions{
8081
PrevExist: etcd.PrevNoExist,
8182
}
82-
_, err = r.kAPI.Set(r.ctx(), key, strTo, opts)
83+
_, err = r.kAPI.Set(context.Background(), key, strTo, opts)
8384
return err
8485
}
8586

0 commit comments

Comments
 (0)