Skip to content

Commit c19c097

Browse files
Albin Kerouantonakerouanton
Albin Kerouanton
authored andcommitted
Test how the logger behaves in async mode
New testcases have been written to test if PostWithTime() and Close() are working as expected. More specifically, they test if Close() succeed with all combination of options (Async + ForceStopAsyncSend). Tests on PostWithTime() are divided into two parts: * With transient network failure ; * And without any ; And tests on Close() are divided into two parts: * CloseOnAsyncConnect: When no logs have been written yet ; * CloseOnAsyncReconnect: When there're pending logs to write ; Finally, the test `Test_send_WritePendingToConn` has been removed because it was relying too much on the logger internals to test the logger's behavior from an API user PoV. And the init() that was starting a tcp listener has been removed to make the tests easier to execute (otherwise, we've to wait for go test to timeout before re-running it). Signed-off-by: Albin Kerouanton <[email protected]>
1 parent 28f6a3e commit c19c097

File tree

2 files changed

+415
-165
lines changed

2 files changed

+415
-165
lines changed

fluent/fluent.go

+29-9
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import (
1515
"bytes"
1616
"encoding/base64"
1717
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1918
"math/rand"
19+
20+
"github.com/tinylib/msgp/msgp"
2021
)
2122

2223
const (
@@ -84,6 +85,7 @@ type msgToSend struct {
8485
type Fluent struct {
8586
Config
8687

88+
dialer dialer
8789
stopRunning chan bool
8890
pending chan *msgToSend
8991
wg sync.WaitGroup
@@ -93,7 +95,20 @@ type Fluent struct {
9395
}
9496

9597
// New creates a new Logger.
96-
func New(config Config) (f *Fluent, err error) {
98+
func New(config Config) (*Fluent, error) {
99+
if config.Timeout == 0 {
100+
config.Timeout = defaultTimeout
101+
}
102+
return newWithDialer(config, &net.Dialer{
103+
Timeout: config.Timeout,
104+
})
105+
}
106+
107+
type dialer interface {
108+
Dial(string, string) (net.Conn, error)
109+
}
110+
111+
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
97112
if config.FluentNetwork == "" {
98113
config.FluentNetwork = defaultNetwork
99114
}
@@ -106,9 +121,6 @@ func New(config Config) (f *Fluent, err error) {
106121
if config.FluentSocketPath == "" {
107122
config.FluentSocketPath = defaultSocketPath
108123
}
109-
if config.Timeout == 0 {
110-
config.Timeout = defaultTimeout
111-
}
112124
if config.WriteTimeout == 0 {
113125
config.WriteTimeout = defaultWriteTimeout
114126
}
@@ -128,15 +140,20 @@ func New(config Config) (f *Fluent, err error) {
128140
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
129141
config.Async = config.Async || config.AsyncConnect
130142
}
143+
131144
if config.Async {
132145
f = &Fluent{
133146
Config: config,
147+
dialer: d,
134148
pending: make(chan *msgToSend, config.BufferLimit),
135149
}
136150
f.wg.Add(1)
137151
go f.run()
138152
} else {
139-
f = &Fluent{Config: config}
153+
f = &Fluent{
154+
Config: config,
155+
dialer: d,
156+
}
140157
err = f.connect()
141158
}
142159
return
@@ -340,12 +357,15 @@ func (f *Fluent) close(c net.Conn) {
340357

341358
// connect establishes a new connection using the specified transport.
342359
func (f *Fluent) connect() (err error) {
343-
344360
switch f.Config.FluentNetwork {
345361
case "tcp":
346-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
362+
f.conn, err = f.dialer.Dial(
363+
f.Config.FluentNetwork,
364+
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
347365
case "unix":
348-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
366+
f.conn, err = f.dialer.Dial(
367+
f.Config.FluentNetwork,
368+
f.Config.FluentSocketPath)
349369
default:
350370
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
351371
}

0 commit comments

Comments
 (0)