Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

Commit 1882d3c

Browse files
author
Dongsu Park
committed
Merge pull request #1580 from endocode/dongsu/etcd-request-timeout
registry: use etcd.Config.HeaderTimeoutPerRequest instead of internal timeout
2 parents 77dd54e + 36094ff commit 1882d3c

File tree

10 files changed

+53
-57
lines changed

10 files changed

+53
-57
lines changed

fleetctl/fleetctl.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,9 @@ func getRegistryClient() (client.API, error) {
462462
}
463463

464464
eCfg := etcd.Config{
465-
Endpoints: strings.Split(globalFlags.Endpoint, ","),
466-
Transport: trans,
465+
Endpoints: strings.Split(globalFlags.Endpoint, ","),
466+
Transport: trans,
467+
HeaderTimeoutPerRequest: getRequestTimeoutFlag(),
467468
}
468469

469470
eClient, err := etcd.New(eCfg)
@@ -472,7 +473,7 @@ func getRegistryClient() (client.API, error) {
472473
}
473474

474475
kAPI := etcd.NewKeysAPI(eClient)
475-
reg := registry.NewEtcdRegistry(kAPI, globalFlags.EtcdKeyPrefix, getRequestTimeoutFlag())
476+
reg := registry.NewEtcdRegistry(kAPI, globalFlags.EtcdKeyPrefix)
476477

477478
if msg, ok := checkVersion(reg); !ok {
478479
stderr(msg)

pkg/lease/etcd.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,16 @@ func serializeLeaseMetadata(machID string, ver int) (string, error) {
9797
}
9898

9999
type etcdLeaseManager struct {
100-
kAPI etcd.KeysAPI
101-
keyPrefix string
102-
reqTimeout time.Duration
100+
kAPI etcd.KeysAPI
101+
keyPrefix string
103102
}
104103

105-
func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string, reqTimeout time.Duration) *etcdLeaseManager {
106-
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix, reqTimeout: reqTimeout}
104+
func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string) *etcdLeaseManager {
105+
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix}
107106
}
108107

109108
func (r *etcdLeaseManager) ctx() context.Context {
110-
ctx, _ := context.WithTimeout(context.Background(), r.reqTimeout)
111-
return ctx
109+
return context.Background()
112110
}
113111

114112
func (r *etcdLeaseManager) leasePath(name string) string {

registry/etcd.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,23 @@ package registry
1616

1717
import (
1818
"path"
19-
"time"
2019

2120
etcd "github.com/coreos/etcd/client"
22-
"golang.org/x/net/context"
2321
)
2422

2523
const DefaultKeyPrefix = "/_coreos.com/fleet/"
2624

27-
func NewEtcdRegistry(kAPI etcd.KeysAPI, keyPrefix string, reqTimeout time.Duration) *EtcdRegistry {
25+
func NewEtcdRegistry(kAPI etcd.KeysAPI, keyPrefix string) *EtcdRegistry {
2826
return &EtcdRegistry{
29-
kAPI: kAPI,
30-
keyPrefix: keyPrefix,
31-
reqTimeout: reqTimeout,
27+
kAPI: kAPI,
28+
keyPrefix: keyPrefix,
3229
}
3330
}
3431

3532
// EtcdRegistry fulfils the Registry interface and uses etcd as a backend
3633
type EtcdRegistry struct {
37-
kAPI etcd.KeysAPI
38-
keyPrefix string
39-
reqTimeout time.Duration
40-
}
41-
42-
func (r *EtcdRegistry) ctx() context.Context {
43-
ctx, _ := context.WithTimeout(context.Background(), r.reqTimeout)
44-
return ctx
34+
kAPI etcd.KeysAPI
35+
keyPrefix string
4536
}
4637

4738
func (r *EtcdRegistry) prefixed(p ...string) string {

registry/job.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sort"
2222

2323
etcd "github.com/coreos/etcd/client"
24+
"golang.org/x/net/context"
2425

2526
"github.com/coreos/fleet/job"
2627
"github.com/coreos/fleet/log"
@@ -38,7 +39,7 @@ func (r *EtcdRegistry) Schedule() ([]job.ScheduledUnit, error) {
3839
Sort: true,
3940
Recursive: true,
4041
}
41-
res, err := r.kAPI.Get(r.ctx(), key, opts)
42+
res, err := r.kAPI.Get(context.Background(), key, opts)
4243
if err != nil {
4344
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
4445
err = nil
@@ -94,7 +95,7 @@ func (r *EtcdRegistry) Units() ([]job.Unit, error) {
9495
Sort: true,
9596
Recursive: true,
9697
}
97-
res, err := r.kAPI.Get(r.ctx(), key, opts)
98+
res, err := r.kAPI.Get(context.Background(), key, opts)
9899
if err != nil {
99100
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
100101
err = nil
@@ -142,7 +143,7 @@ func (r *EtcdRegistry) Unit(name string) (*job.Unit, error) {
142143
opts := &etcd.GetOptions{
143144
Recursive: true,
144145
}
145-
res, err := r.kAPI.Get(r.ctx(), key, opts)
146+
res, err := r.kAPI.Get(context.Background(), key, opts)
146147
if err != nil {
147148
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
148149
err = nil
@@ -192,7 +193,7 @@ func (r *EtcdRegistry) ScheduledUnit(name string) (*job.ScheduledUnit, error) {
192193
opts := &etcd.GetOptions{
193194
Recursive: true,
194195
}
195-
res, err := r.kAPI.Get(r.ctx(), key, opts)
196+
res, err := r.kAPI.Get(context.Background(), key, opts)
196197
if err != nil {
197198
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
198199
err = nil
@@ -224,7 +225,7 @@ func (r *EtcdRegistry) UnscheduleUnit(name, machID string) error {
224225
opts := &etcd.DeleteOptions{
225226
PrevValue: machID,
226227
}
227-
_, err := r.kAPI.Delete(r.ctx(), key, opts)
228+
_, err := r.kAPI.Delete(context.Background(), key, opts)
228229
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
229230
err = nil
230231
}
@@ -297,7 +298,7 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
297298
opts := &etcd.DeleteOptions{
298299
Recursive: true,
299300
}
300-
_, err := r.kAPI.Delete(r.ctx(), key, opts)
301+
_, err := r.kAPI.Delete(context.Background(), key, opts)
301302
if err != nil {
302303
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
303304
err = errors.New("job does not exist")
@@ -332,7 +333,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
332333
PrevExist: etcd.PrevIgnore,
333334
}
334335
key := r.prefixed(jobPrefix, u.Name, "object")
335-
_, err = r.kAPI.Set(r.ctx(), key, val, opts)
336+
_, err = r.kAPI.Set(context.Background(), key, val, opts)
336337
if err != nil {
337338
return
338339
}
@@ -342,7 +343,7 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
342343

343344
func (r *EtcdRegistry) SetUnitTargetState(name string, state job.JobState) error {
344345
key := r.jobTargetStatePath(name)
345-
_, err := r.kAPI.Set(r.ctx(), key, string(state), nil)
346+
_, err := r.kAPI.Set(context.Background(), key, string(state), nil)
346347
return err
347348
}
348349

@@ -351,7 +352,7 @@ func (r *EtcdRegistry) ScheduleUnit(name string, machID string) error {
351352
opts := &etcd.SetOptions{
352353
PrevExist: etcd.PrevNoExist,
353354
}
354-
_, err := r.kAPI.Set(r.ctx(), key, machID, opts)
355+
_, err := r.kAPI.Set(context.Background(), key, machID, opts)
355356
return err
356357
}
357358

registry/job_state.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
etcd "github.com/coreos/etcd/client"
21+
"golang.org/x/net/context"
2122

2223
"github.com/coreos/fleet/job"
2324
"github.com/coreos/fleet/unit"
@@ -51,13 +52,13 @@ func (r *EtcdRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err
5152
opts := &etcd.SetOptions{
5253
TTL: ttl,
5354
}
54-
_, err := r.kAPI.Set(r.ctx(), key, machID, opts)
55+
_, err := r.kAPI.Set(context.Background(), key, machID, opts)
5556
return err
5657
}
5758

5859
func (r *EtcdRegistry) ClearUnitHeartbeat(name string) {
5960
key := r.jobHeartbeatPath(name)
60-
r.kAPI.Delete(r.ctx(), key, nil)
61+
r.kAPI.Delete(context.Background(), key, nil)
6162
}
6263

6364
func (r *EtcdRegistry) jobHeartbeatPath(jobName string) string {

registry/machine.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
etcd "github.com/coreos/etcd/client"
22+
"golang.org/x/net/context"
2223

2324
"github.com/coreos/fleet/machine"
2425
)
@@ -34,7 +35,7 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) {
3435
Recursive: true,
3536
}
3637

37-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
38+
resp, err := r.kAPI.Get(context.Background(), key, opts)
3839
if err != nil {
3940
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
4041
err = nil
@@ -72,7 +73,7 @@ func (r *EtcdRegistry) CreateMachineState(ms machine.MachineState, ttl time.Dura
7273
PrevExist: etcd.PrevNoExist,
7374
TTL: ttl,
7475
}
75-
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
76+
resp, err := r.kAPI.Set(context.Background(), key, val, opts)
7677
if err != nil {
7778
return uint64(0), err
7879
}
@@ -91,7 +92,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
9192
PrevExist: etcd.PrevExist,
9293
TTL: ttl,
9394
}
94-
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
95+
resp, err := r.kAPI.Set(context.Background(), key, val, opts)
9596
if err == nil {
9697
return resp.Node.ModifiedIndex, nil
9798
}
@@ -100,7 +101,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
100101
// in the cluster know this is a new member
101102
opts.PrevExist = etcd.PrevNoExist
102103

103-
resp, err = r.kAPI.Set(r.ctx(), key, val, opts)
104+
resp, err = r.kAPI.Set(context.Background(), key, val, opts)
104105
if err != nil {
105106
return uint64(0), err
106107
}
@@ -110,7 +111,7 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio
110111

111112
func (r *EtcdRegistry) RemoveMachineState(machID string) error {
112113
key := r.prefixed(machinePrefix, machID, "object")
113-
_, err := r.kAPI.Delete(r.ctx(), key, nil)
114+
_, err := r.kAPI.Delete(context.Background(), key, nil)
114115
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
115116
err = nil
116117
}

registry/unit.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
etcd "github.com/coreos/etcd/client"
22+
"golang.org/x/net/context"
2223

2324
"github.com/coreos/fleet/log"
2425
"github.com/coreos/fleet/metrics"
@@ -44,7 +45,7 @@ func (r *EtcdRegistry) storeOrGetUnitFile(u unit.UnitFile) (err error) {
4445
PrevExist: etcd.PrevNoExist,
4546
}
4647
start := time.Now()
47-
_, err = r.kAPI.Set(r.ctx(), key, val, opts)
48+
_, err = r.kAPI.Set(context.Background(), key, val, opts)
4849
// unit is already stored
4950
if isEtcdError(err, etcd.ErrorCodeNodeExist) {
5051
// TODO(jonboulle): verify more here?
@@ -65,7 +66,7 @@ func (r *EtcdRegistry) getUnitByHash(hash unit.Hash) *unit.UnitFile {
6566
Recursive: true,
6667
}
6768
start := time.Now()
68-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
69+
resp, err := r.kAPI.Get(context.Background(), key, opts)
6970
if err != nil {
7071
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
7172
err = nil
@@ -86,7 +87,7 @@ func (r *EtcdRegistry) getAllUnitsHashMap() (map[string]*unit.UnitFile, error) {
8687
}
8788
hashToUnit := map[string]*unit.UnitFile{}
8889
start := time.Now()
89-
resp, err := r.kAPI.Get(r.ctx(), key, opts)
90+
resp, err := r.kAPI.Get(context.Background(), key, opts)
9091
if err != nil {
9192
metrics.ReportRegistryOpFailure(metrics.GetAll)
9293
return nil, err

registry/unit_state.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
etcd "github.com/coreos/etcd/client"
23+
"golang.org/x/net/context"
2324

2425
"github.com/coreos/fleet/log"
2526
"github.com/coreos/fleet/machine"
@@ -98,7 +99,7 @@ func (r *EtcdRegistry) statesByMUSKey() (map[MUSKey]*unit.UnitState, error) {
9899
opts := &etcd.GetOptions{
99100
Recursive: true,
100101
}
101-
res, err := r.kAPI.Get(r.ctx(), key, opts)
102+
res, err := r.kAPI.Get(context.Background(), key, opts)
102103
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
103104
return nil, err
104105
}
@@ -127,7 +128,7 @@ func (r *EtcdRegistry) statesByMUSKey() (map[MUSKey]*unit.UnitState, error) {
127128
// given unit that originates from the indicated machine
128129
func (r *EtcdRegistry) getUnitState(uName, machID string) (*unit.UnitState, error) {
129130
key := r.unitStatePath(machID, uName)
130-
res, err := r.kAPI.Get(r.ctx(), key, nil)
131+
res, err := r.kAPI.Get(context.Background(), key, nil)
131132
if err != nil {
132133
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
133134
err = nil
@@ -162,17 +163,17 @@ func (r *EtcdRegistry) SaveUnitState(jobName string, unitState *unit.UnitState,
162163
}
163164

164165
legacyKey := r.legacyUnitStatePath(jobName)
165-
r.kAPI.Set(r.ctx(), legacyKey, val, opts)
166+
r.kAPI.Set(context.Background(), legacyKey, val, opts)
166167

167168
newKey := r.unitStatePath(unitState.MachineID, jobName)
168-
r.kAPI.Set(r.ctx(), newKey, val, opts)
169+
r.kAPI.Set(context.Background(), newKey, val, opts)
169170
}
170171

171172
// Delete the state from the Registry for the given Job's Unit
172173
func (r *EtcdRegistry) RemoveUnitState(jobName string) error {
173174
// TODO(jonboulle): consider https://github.com/coreos/fleet/issues/465
174175
legacyKey := r.legacyUnitStatePath(jobName)
175-
_, err := r.kAPI.Delete(r.ctx(), legacyKey, nil)
176+
_, err := r.kAPI.Delete(context.Background(), legacyKey, nil)
176177
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
177178
return err
178179
}
@@ -182,7 +183,7 @@ func (r *EtcdRegistry) RemoveUnitState(jobName string) error {
182183
opts := &etcd.DeleteOptions{
183184
Recursive: true,
184185
}
185-
_, err = r.kAPI.Delete(r.ctx(), newKey, opts)
186+
_, err = r.kAPI.Delete(context.Background(), newKey, opts)
186187
if err != nil && !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
187188
return err
188189
}

registry/version.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
etcd "github.com/coreos/etcd/client"
2121
"github.com/coreos/go-semver/semver"
22+
"golang.org/x/net/context"
2223
)
2324

2425
// LatestDaemonVersion attempts to retrieve the latest version of fleetd
@@ -46,7 +47,7 @@ func (r *EtcdRegistry) LatestDaemonVersion() (*semver.Version, error) {
4647

4748
// EngineVersion implements the ClusterRegistry interface
4849
func (r *EtcdRegistry) EngineVersion() (int, error) {
49-
res, err := r.kAPI.Get(r.ctx(), r.engineVersionPath(), nil)
50+
res, err := r.kAPI.Get(context.Background(), r.engineVersionPath(), nil)
5051
if err != nil {
5152
// no big deal, either the cluster is new or is just
5253
// upgrading from old unversioned code
@@ -69,7 +70,7 @@ func (r *EtcdRegistry) UpdateEngineVersion(from, to int) error {
6970
opts := &etcd.SetOptions{
7071
PrevValue: strFrom,
7172
}
72-
_, err := r.kAPI.Set(r.ctx(), key, strTo, opts)
73+
_, err := r.kAPI.Set(context.Background(), key, strTo, opts)
7374
if err == nil {
7475
return nil
7576
} else if !isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
@@ -79,7 +80,7 @@ func (r *EtcdRegistry) UpdateEngineVersion(from, to int) error {
7980
opts = &etcd.SetOptions{
8081
PrevExist: etcd.PrevNoExist,
8182
}
82-
_, err = r.kAPI.Set(r.ctx(), key, strTo, opts)
83+
_, err = r.kAPI.Set(context.Background(), key, strTo, opts)
8384
return err
8485
}
8586

0 commit comments

Comments
 (0)