Skip to content

Commit 061dad1

Browse files
Add DialTimeout, ReadTimeout, WriteTimeout
Add configurable timeouts for connection establishment and read/write operations in the Client struct
1 parent 52a98c7 commit 061dad1

File tree

6 files changed

+167
-47
lines changed

6 files changed

+167
-47
lines changed

.golangci.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ linters:
66
enable:
77
- revive
88
- bodyclose
9-
- exportloopref
109
- gocognit
1110
- goconst
1211
- gofmt
@@ -22,7 +21,6 @@ issues:
2221
- path: _test\.go
2322
linters:
2423
- errcheck
25-
- exportloopref
2624
- gocognit
2725
- goconst
2826
- gosec

cmd/bm/fluent_forward_go/bm_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func Benchmark_Fluent_Forward_Go_SendOnly(b *testing.B) {
1515
tagVar := "bar"
1616

1717
c := client.New(client.ConnectionOptions{
18-
ConnectionTimeout: 3 * time.Second,
18+
DialTimeout: 3 * time.Second,
1919
})
2020

2121
err := c.Connect()
@@ -43,7 +43,7 @@ func Benchmark_Fluent_Forward_Go_SingleMessage(b *testing.B) {
4343
tagVar := "bar"
4444

4545
c := client.New(client.ConnectionOptions{
46-
ConnectionTimeout: 3 * time.Second,
46+
DialTimeout: 3 * time.Second,
4747
})
4848

4949
err := c.Connect()
@@ -71,8 +71,8 @@ func Benchmark_Fluent_Forward_Go_SingleMessageAck(b *testing.B) {
7171
tagVar := "foo"
7272

7373
c := client.New(client.ConnectionOptions{
74-
RequireAck: true,
75-
ConnectionTimeout: 3 * time.Second,
74+
RequireAck: true,
75+
DialTimeout: 3 * time.Second,
7676
})
7777

7878
err := c.Connect()
@@ -100,7 +100,7 @@ func Benchmark_Fluent_Forward_Go_Bytes(b *testing.B) {
100100
tagVar := "foo"
101101

102102
c := client.New(client.ConnectionOptions{
103-
ConnectionTimeout: 3 * time.Second,
103+
DialTimeout: 3 * time.Second,
104104
})
105105

106106
err := c.Connect()
@@ -130,8 +130,8 @@ func Benchmark_Fluent_Forward_Go_BytesAck(b *testing.B) {
130130
tagVar := "foo"
131131

132132
c := client.New(client.ConnectionOptions{
133-
RequireAck: true,
134-
ConnectionTimeout: 3 * time.Second,
133+
RequireAck: true,
134+
DialTimeout: 3 * time.Second,
135135
})
136136

137137
err := c.Connect()
@@ -162,7 +162,7 @@ func Benchmark_Fluent_Forward_Go_RawMessage(b *testing.B) {
162162
tagVar := "foo"
163163

164164
c := client.New(client.ConnectionOptions{
165-
ConnectionTimeout: 3 * time.Second,
165+
DialTimeout: 3 * time.Second,
166166
})
167167

168168
err := c.Connect()
@@ -193,8 +193,8 @@ func Benchmark_Fluent_Forward_Go_RawMessageAck(b *testing.B) {
193193
tagVar := "foo"
194194

195195
c := client.New(client.ConnectionOptions{
196-
RequireAck: true,
197-
ConnectionTimeout: 3 * time.Second,
196+
RequireAck: true,
197+
DialTimeout: 3 * time.Second,
198198
})
199199

200200
err := c.Connect()
@@ -226,7 +226,7 @@ func Benchmark_Fluent_Forward_Go_CompressedMessage(b *testing.B) {
226226
tagVar := "foo"
227227

228228
c := client.New(client.ConnectionOptions{
229-
ConnectionTimeout: 3 * time.Second,
229+
DialTimeout: 3 * time.Second,
230230
})
231231

232232
err := c.Connect()
@@ -280,8 +280,8 @@ func Benchmark_Fluent_Forward_Go_CompressedMessageAck(b *testing.B) {
280280
tagVar := "foo"
281281

282282
c := client.New(client.ConnectionOptions{
283-
RequireAck: true,
284-
ConnectionTimeout: 3 * time.Second,
283+
RequireAck: true,
284+
DialTimeout: 3 * time.Second,
285285
})
286286

287287
err := c.Connect()

fluent/client/client.go

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ import (
4141
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
4242

4343
const (
44-
DefaultConnectionTimeout time.Duration = 60 * time.Second
44+
DefaultDialTimeout time.Duration = 10 * time.Second
45+
DefaultReadTimeout time.Duration = 30 * time.Second
46+
DefaultWriteTimeout time.Duration = 30 * time.Second
4547
)
4648

4749
// MessageClient implementations send MessagePack messages to a peer
@@ -71,23 +73,24 @@ type ConnectionFactory interface {
7173

7274
type Client struct {
7375
ConnectionFactory
74-
RequireAck bool
75-
Timeout time.Duration
76-
AuthInfo AuthInfo
77-
Hostname string
78-
session *Session
79-
ackLock sync.Mutex
80-
sessionLock sync.RWMutex
76+
RequireAck bool
77+
DialTimeout time.Duration
78+
ReadTimeout time.Duration
79+
WriteTimeout time.Duration
80+
AuthInfo AuthInfo
81+
Hostname string
82+
session *Session
83+
ackLock sync.Mutex
84+
sessionLock sync.RWMutex
8185
}
8286

8387
type ConnectionOptions struct {
84-
Factory ConnectionFactory
85-
RequireAck bool
86-
ConnectionTimeout time.Duration
87-
// TODO:
88-
// ReadTimeout time.Duration
89-
// WriteTimeout time.Duration
90-
AuthInfo AuthInfo
88+
Factory ConnectionFactory
89+
RequireAck bool
90+
DialTimeout time.Duration
91+
ReadTimeout time.Duration
92+
WriteTimeout time.Duration
93+
AuthInfo AuthInfo
9194
}
9295

9396
type AuthInfo struct {
@@ -110,15 +113,26 @@ func New(opts ConnectionOptions) *Client {
110113
}
111114
}
112115

113-
if opts.ConnectionTimeout == 0 {
114-
opts.ConnectionTimeout = DefaultConnectionTimeout
116+
// Set default timeouts if not provided
117+
if opts.DialTimeout == 0 {
118+
opts.DialTimeout = DefaultDialTimeout
119+
}
120+
121+
if opts.ReadTimeout == 0 {
122+
opts.ReadTimeout = DefaultReadTimeout
123+
}
124+
125+
if opts.WriteTimeout == 0 {
126+
opts.WriteTimeout = DefaultWriteTimeout
115127
}
116128

117129
return &Client{
118130
ConnectionFactory: factory,
119131
AuthInfo: opts.AuthInfo,
120132
RequireAck: opts.RequireAck,
121-
Timeout: opts.ConnectionTimeout,
133+
DialTimeout: opts.DialTimeout,
134+
ReadTimeout: opts.ReadTimeout,
135+
WriteTimeout: opts.WriteTimeout,
122136
}
123137
}
124138

@@ -164,6 +178,13 @@ func (c *Client) Handshake() error {
164178

165179
var helo protocol.Helo
166180

181+
// apply read timeout for reading helo message
182+
if c.ReadTimeout != 0 {
183+
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
184+
return err
185+
}
186+
}
187+
167188
r := msgp.NewReader(c.session.Connection)
168189
err := helo.DecodeMsg(r)
169190

@@ -183,13 +204,27 @@ func (c *Client) Handshake() error {
183204
return err
184205
}
185206

207+
// apply write timeout for sending the ping message
208+
if c.WriteTimeout != 0 {
209+
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
210+
return err
211+
}
212+
}
213+
186214
err = msgp.Encode(c.session.Connection, ping)
187215
if err != nil {
188216
return err
189217
}
190218

191219
var pong protocol.Pong
192220

221+
// apply read timeout for receiving the pong message
222+
if c.ReadTimeout != 0 {
223+
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
224+
return err
225+
}
226+
}
227+
193228
err = pong.DecodeMsg(r)
194229
if err != nil {
195230
return err
@@ -246,8 +281,8 @@ func (c *Client) Reconnect() error {
246281
}
247282

248283
func (c *Client) checkAck(chunk string) error {
249-
if c.Timeout != 0 {
250-
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.Timeout)); err != nil {
284+
if c.ReadTimeout != 0 {
285+
if err := c.session.Connection.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
251286
return err
252287
}
253288
}
@@ -292,6 +327,13 @@ func (c *Client) Send(e protocol.ChunkEncoder) error {
292327
defer c.ackLock.Unlock()
293328
}
294329

330+
// apply write timeout for sending the message
331+
if c.WriteTimeout != 0 {
332+
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
333+
return err
334+
}
335+
}
336+
295337
err = msgp.Encode(c.session.Connection, e)
296338
if err != nil || !c.RequireAck {
297339
return err
@@ -315,6 +357,13 @@ func (c *Client) SendRaw(m []byte) error {
315357
return errors.New("session handshake not completed")
316358
}
317359

360+
// apply write timeout for write
361+
if c.WriteTimeout != 0 {
362+
if err := c.session.Connection.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
363+
return err
364+
}
365+
}
366+
318367
_, err := c.session.Connection.Write(m)
319368

320369
return err

fluent/client/client_test.go

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ var _ = Describe("Client", func() {
5050
factory = &clientfakes.FakeConnectionFactory{}
5151

5252
opts := ConnectionOptions{
53-
Factory: factory,
54-
ConnectionTimeout: 2 * time.Second,
53+
Factory: factory,
54+
DialTimeout: 2 * time.Second,
5555
}
5656

5757
client = New(opts)
@@ -184,6 +184,77 @@ var _ = Describe("Client", func() {
184184
// TODO: We need a test that no message is sent
185185
})
186186

187+
Context("When WriteDeadline is set and write takes too long", func() {
188+
BeforeEach(func() {
189+
opts := ConnectionOptions{
190+
Factory: factory,
191+
DialTimeout: 2 * time.Second,
192+
WriteTimeout: 100 * time.Millisecond, // short timeout
193+
RequireAck: false,
194+
}
195+
client = New(opts)
196+
clientSide, serverSide = net.Pipe()
197+
factory.NewReturns(clientSide, nil)
198+
})
199+
200+
It("returns a write timeout error", func() {
201+
go func() {
202+
defer GinkgoRecover()
203+
time.Sleep(200 * time.Millisecond) // this is longer than WriteTimeout
204+
buf := make([]byte, 1024)
205+
_, err := serverSide.Read(buf)
206+
Expect(err).NotTo(HaveOccurred())
207+
}()
208+
209+
err := client.Send(&msg)
210+
Expect(err).To(HaveOccurred())
211+
Expect(err.Error()).To(ContainSubstring("write pipe: i/o timeout"))
212+
})
213+
})
214+
215+
Context("When ReadDeadline is set and acknowledgment is delayed", func() {
216+
BeforeEach(func() {
217+
opts := ConnectionOptions{
218+
Factory: factory,
219+
DialTimeout: 2 * time.Second,
220+
ReadTimeout: 100 * time.Millisecond, // short timeout
221+
RequireAck: true, // waig for acknowledgment
222+
}
223+
client = New(opts)
224+
clientSide, serverSide = net.Pipe()
225+
factory.NewReturns(clientSide, nil)
226+
})
227+
228+
It("returns a read timeout error", func() {
229+
done := make(chan bool) // used to synchronize test
230+
go func() {
231+
defer GinkgoRecover()
232+
defer func() {
233+
done <- true
234+
}()
235+
err := client.Send(&msg)
236+
Expect(err).To(HaveOccurred())
237+
Expect(err.Error()).To(ContainSubstring("read pipe: i/o timeout"))
238+
}()
239+
240+
// Server reads the message
241+
rcvd := &protocol.MessageExt{}
242+
err := rcvd.DecodeMsg(msgp.NewReader(serverSide))
243+
Expect(err).NotTo(HaveOccurred())
244+
245+
// delay sending the ack (longer than ReadTimeout)
246+
time.Sleep(200 * time.Millisecond)
247+
248+
// send the ack
249+
chunk := rcvd.Options.Chunk
250+
ack := &protocol.AckMessage{Ack: chunk}
251+
err = ack.EncodeMsg(msgp.NewWriter(serverSide))
252+
Expect(err).NotTo(HaveOccurred())
253+
254+
<-done
255+
})
256+
})
257+
187258
Context("RequireAck is true", func() {
188259
var (
189260
serverSide net.Conn
@@ -215,11 +286,11 @@ var _ = Describe("Client", func() {
215286
Expect(err).ToNot(HaveOccurred())
216287
}()
217288

218-
rcvd := &protocol.MessageExt{}
219-
err := rcvd.DecodeMsg(serverReader)
289+
msgext := &protocol.MessageExt{}
290+
err := msgext.DecodeMsg(serverReader)
220291
Expect(err).ToNot(HaveOccurred())
221292

222-
chunk := rcvd.Options.Chunk
293+
chunk := msgext.Options.Chunk
223294
Expect(chunk).ToNot(BeEmpty())
224295
Expect(chunk).To(Equal(msg.Options.Chunk))
225296

@@ -241,12 +312,12 @@ var _ = Describe("Client", func() {
241312
Expect(err.Error()).To(ContainSubstring("Expected chunk"))
242313
}()
243314

244-
rcvd := &protocol.MessageExt{}
315+
msgext := &protocol.MessageExt{}
245316
serverSide.SetReadDeadline(time.Now().Add(time.Second))
246-
err := rcvd.DecodeMsg(serverReader)
317+
err := msgext.DecodeMsg(serverReader)
247318
Expect(err).ToNot(HaveOccurred())
248319

249-
chunk := rcvd.Options.Chunk
320+
chunk := msgext.Options.Chunk
250321
Expect(chunk).ToNot(BeEmpty())
251322
Expect(chunk).To(Equal(msg.Options.Chunk))
252323

0 commit comments

Comments
 (0)