Skip to content

Commit db62819

Browse files
qingyang-huBenji Rewis
authored andcommitted
GODRIVER-2577 Retry heartbeat on timeout to prevent pool cleanup in FAAS pause. (#1133)
1 parent 08eea47 commit db62819

File tree

2 files changed

+195
-17
lines changed

2 files changed

+195
-17
lines changed

x/mongo/driver/topology/server.go

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ func (s *Server) update() {
520520
}
521521
}
522522

523+
timeoutCnt := 0
523524
for {
524525
// Check if the server is disconnecting. Even if waitForNextCheck has already read from the done channel, we
525526
// can safely read from it again because Disconnect closes the channel.
@@ -545,18 +546,42 @@ func (s *Server) update() {
545546
continue
546547
}
547548

548-
// Must hold the processErrorLock while updating the server description and clearing the
549-
// pool. Not holding the lock leads to possible out-of-order processing of pool.clear() and
550-
// pool.ready() calls from concurrent server description updates.
551-
s.processErrorLock.Lock()
552-
s.updateDescription(desc)
553-
if err := desc.LastError; err != nil {
554-
// Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear
555-
// because the monitoring routine only runs for non-load balanced deployments in which servers don't return
556-
// IDs.
557-
s.pool.clear(err, nil)
549+
if isShortcut := func() bool {
550+
// Must hold the processErrorLock while updating the server description and clearing the
551+
// pool. Not holding the lock leads to possible out-of-order processing of pool.clear() and
552+
// pool.ready() calls from concurrent server description updates.
553+
s.processErrorLock.Lock()
554+
defer s.processErrorLock.Unlock()
555+
556+
s.updateDescription(desc)
557+
// Retry after the first timeout before clearing the pool in case of a FAAS pause as
558+
// described in GODRIVER-2577.
559+
if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 1 {
560+
if err == context.Canceled || err == context.DeadlineExceeded {
561+
timeoutCnt++
562+
// We want to immediately retry on timeout error. Continue to next loop.
563+
return true
564+
}
565+
if err, ok := err.(net.Error); ok && err.Timeout() {
566+
timeoutCnt++
567+
// We want to immediately retry on timeout error. Continue to next loop.
568+
return true
569+
}
570+
}
571+
if err := desc.LastError; err != nil {
572+
// Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear
573+
// because the monitoring routine only runs for non-load balanced deployments in which servers don't return
574+
// IDs.
575+
s.pool.clear(err, nil)
576+
}
577+
// We're either not handling a timeout error, or we just handled the 2nd consecutive
578+
// timeout error. In either case, reset the timeout count to 0 and return false to
579+
// continue the normal check process.
580+
timeoutCnt = 0
581+
return false
582+
}(); isShortcut {
583+
continue
558584
}
559-
s.processErrorLock.Unlock()
560585

561586
// If the server supports streaming or we're already streaming, we want to move to streaming the next response
562587
// without waiting. If the server has transitioned to Unknown from a network error, we want to do another
@@ -707,19 +732,31 @@ func (s *Server) check() (description.Server, error) {
707732
var err error
708733
var durationNanos int64
709734

710-
// Create a new connection if this is the first check, the connection was closed after an error during the previous
711-
// check, or the previous check was cancelled.
735+
start := time.Now()
712736
if s.conn == nil || s.conn.closed() || s.checkWasCancelled() {
737+
// Create a new connection if this is the first check, the connection was closed after an error during the previous
738+
// check, or the previous check was cancelled.
739+
isNilConn := s.conn == nil
740+
if !isNilConn {
741+
s.publishServerHeartbeatStartedEvent(s.conn.ID(), false)
742+
}
713743
// Create a new connection and add it's handshake RTT as a sample.
714744
err = s.setupHeartbeatConnection()
745+
durationNanos = time.Since(start).Nanoseconds()
715746
if err == nil {
716747
// Use the description from the connection handshake as the value for this check.
717748
s.rttMonitor.addSample(s.conn.helloRTT)
718749
descPtr = &s.conn.desc
750+
if !isNilConn {
751+
s.publishServerHeartbeatSucceededEvent(s.conn.ID(), durationNanos, s.conn.desc, false)
752+
}
753+
} else {
754+
err = unwrapConnectionError(err)
755+
if !isNilConn {
756+
s.publishServerHeartbeatFailedEvent(s.conn.ID(), durationNanos, err, false)
757+
}
719758
}
720-
}
721-
722-
if descPtr == nil && err == nil {
759+
} else {
723760
// An existing connection is being used. Use the server description properties to execute the right heartbeat.
724761

725762
// Wrap conn in a type that implements driver.StreamerConnection.
@@ -729,7 +766,6 @@ func (s *Server) check() (description.Server, error) {
729766
streamable := previousDescription.TopologyVersion != nil
730767

731768
s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable)
732-
start := time.Now()
733769
switch {
734770
case s.conn.getCurrentlyStreaming():
735771
// The connection is already in a streaming state, so we stream the next response.

x/mongo/driver/topology/server_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ package topology
1111

1212
import (
1313
"context"
14+
"crypto/tls"
15+
"crypto/x509"
1416
"errors"
17+
"io/ioutil"
1518
"net"
19+
"os"
1620
"runtime"
1721
"sync"
1822
"sync/atomic"
@@ -49,6 +53,144 @@ func (cncd *channelNetConnDialer) DialContext(_ context.Context, _, _ string) (n
4953
return cnc, nil
5054
}
5155

56+
type errorQueue struct {
57+
errors []error
58+
mutex sync.Mutex
59+
}
60+
61+
func (eq *errorQueue) head() error {
62+
eq.mutex.Lock()
63+
defer eq.mutex.Unlock()
64+
if len(eq.errors) > 0 {
65+
return eq.errors[0]
66+
}
67+
return nil
68+
}
69+
70+
func (eq *errorQueue) dequeue() bool {
71+
eq.mutex.Lock()
72+
defer eq.mutex.Unlock()
73+
if len(eq.errors) > 0 {
74+
eq.errors = eq.errors[1:]
75+
return true
76+
}
77+
return false
78+
}
79+
80+
type timeoutConn struct {
81+
net.Conn
82+
errors *errorQueue
83+
}
84+
85+
func (c *timeoutConn) Read(b []byte) (int, error) {
86+
n, err := 0, c.errors.head()
87+
if err == nil {
88+
n, err = c.Conn.Read(b)
89+
}
90+
return n, err
91+
}
92+
93+
func (c *timeoutConn) Write(b []byte) (int, error) {
94+
n, err := 0, c.errors.head()
95+
if err == nil {
96+
n, err = c.Conn.Write(b)
97+
}
98+
return n, err
99+
}
100+
101+
type timeoutDialer struct {
102+
Dialer
103+
errors *errorQueue
104+
}
105+
106+
func (d *timeoutDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
107+
c, e := d.Dialer.DialContext(ctx, network, address)
108+
109+
if caFile := os.Getenv("MONGO_GO_DRIVER_CA_FILE"); len(caFile) > 0 {
110+
pem, err := ioutil.ReadFile(caFile)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
ca := x509.NewCertPool()
116+
if !ca.AppendCertsFromPEM(pem) {
117+
return nil, errors.New("unable to load CA file")
118+
}
119+
120+
config := &tls.Config{
121+
InsecureSkipVerify: true,
122+
RootCAs: ca,
123+
}
124+
c = tls.Client(c, config)
125+
}
126+
return &timeoutConn{c, d.errors}, e
127+
}
128+
129+
// TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577.
130+
func TestServerHeartbeatTimeout(t *testing.T) {
131+
networkTimeoutError := &net.DNSError{
132+
IsTimeout: true,
133+
}
134+
135+
testCases := []struct {
136+
desc string
137+
ioErrors []error
138+
expectPoolCleared bool
139+
}{
140+
{
141+
desc: "one single timeout should not clear the pool",
142+
ioErrors: []error{nil, networkTimeoutError, nil, networkTimeoutError, nil},
143+
expectPoolCleared: false,
144+
},
145+
{
146+
desc: "continuous timeouts should clear the pool",
147+
ioErrors: []error{nil, networkTimeoutError, networkTimeoutError, nil},
148+
expectPoolCleared: true,
149+
},
150+
}
151+
for _, tc := range testCases {
152+
tc := tc
153+
t.Run(tc.desc, func(t *testing.T) {
154+
t.Parallel()
155+
156+
var wg sync.WaitGroup
157+
wg.Add(1)
158+
159+
errors := &errorQueue{errors: tc.ioErrors}
160+
tpm := monitor.NewTestPoolMonitor()
161+
server := NewServer(
162+
address.Address("localhost:27017"),
163+
primitive.NewObjectID(),
164+
WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor {
165+
return tpm.PoolMonitor
166+
}),
167+
WithConnectionOptions(func(opts ...ConnectionOption) []ConnectionOption {
168+
return append(opts,
169+
WithDialer(func(d Dialer) Dialer {
170+
var dialer net.Dialer
171+
return &timeoutDialer{&dialer, errors}
172+
}))
173+
}),
174+
WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor {
175+
return &event.ServerMonitor{
176+
ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) {
177+
if !errors.dequeue() {
178+
wg.Done()
179+
}
180+
},
181+
}
182+
}),
183+
WithHeartbeatInterval(func(time.Duration) time.Duration {
184+
return 200 * time.Millisecond
185+
}),
186+
)
187+
require.NoError(t, server.Connect(nil))
188+
wg.Wait()
189+
assert.Equal(t, tc.expectPoolCleared, tpm.IsPoolCleared(), "expected pool cleared to be %v but was %v", tc.expectPoolCleared, tpm.IsPoolCleared())
190+
})
191+
}
192+
}
193+
52194
// TestServerConnectionTimeout tests how different timeout errors are handled during connection
53195
// creation and server handshake.
54196
func TestServerConnectionTimeout(t *testing.T) {

0 commit comments

Comments
 (0)