Skip to content

Commit 1dcb39d

Browse files
authored
Merge pull request #799 from kinvolk/imran/backport-790-to-release-0.33
Backporting #790 to release-0.33 from kinvolk/imran/fix-mem-leak-pending-dials
2 parents 18deb66 + 26296d3 commit 1dcb39d

File tree

1 file changed

+38
-17
lines changed

1 file changed

+38
-17
lines changed

pkg/server/tunnel.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8080
if err != nil {
8181
klog.ErrorS(err, "no tunnels available")
8282
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 500 Internal Server Error\r\nContent-Type: text/plain\r\n\r\ncurrently no tunnels available: %v", err)))
83-
conn.Close()
83+
// The hijacked connection will be closed by the closeOnce defer.
8484
return
8585
}
8686
closed := make(chan struct{})
@@ -100,39 +100,60 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
100100
agentID: backend.GetAgentID(),
101101
}
102102
t.Server.PendingDial.Add(random, connection)
103+
104+
// This defer acts as a safeguard to ensure we clean up the pending dial
105+
// if the connection is never successfully established.
106+
established := false
107+
defer func() {
108+
if !established {
109+
if t.Server.PendingDial.Remove(random) != nil {
110+
// This metric is observed only when the frontend closes the connection.
111+
// Other failure reasons are observed elsewhere.
112+
metrics.Metrics.ObserveDialFailure(metrics.DialFailureFrontendClose)
113+
}
114+
}
115+
}()
116+
103117
if err := backend.Send(dialRequest); err != nil {
104118
klog.ErrorS(err, "failed to tunnel dial request", "host", r.Host, "dialID", connection.dialID, "agentID", connection.agentID)
119+
metrics.Metrics.ObserveDialFailure(metrics.DialFailureBackendClose)
105120
// Send proper HTTP error response
106121
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain; charset=utf-8\r\n\r\nFailed to tunnel dial request: %v\r\n", err)))
107-
conn.Close()
108-
return
109-
}
110-
ctxt := backend.Context()
111-
if ctxt.Err() != nil {
112-
klog.ErrorS(ctxt.Err(), "context reports failure")
113-
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain; charset=utf-8\r\n\r\nBackend context error: %v\r\n", ctxt.Err())))
114-
conn.Close()
122+
// The deferred cleanup will run when we return here.
115123
return
116124
}
117125

118-
select {
119-
case <-ctxt.Done():
120-
klog.V(5).Infoln("context reports done")
121-
default:
122-
}
126+
ctxt := backend.Context()
123127

124128
select {
125129
case <-connection.connected: // Waiting for response before we begin full communication.
130+
// The connection is successful. Mark it as established so the deferred
131+
// cleanup function knows not to remove it from PendingDial.
132+
established = true
133+
126134
// Now that connection is established, send 200 OK to switch to tunnel mode
127135
_, err = conn.Write([]byte("HTTP/1.1 200 Connection Established\r\n\r\n"))
128136
if err != nil {
129137
klog.ErrorS(err, "failed to send 200 connection established", "host", r.Host, "agentID", connection.agentID)
130-
conn.Close()
138+
// We return here, but since `established` is true, the deferred
139+
// function will not remove the pending dial. The agent-side goroutine
140+
// is responsible for the established connection now.
131141
return
132142
}
133143
klog.V(3).InfoS("Connection established, sent 200 OK", "host", r.Host, "agentID", connection.agentID, "connectionID", connection.connectID)
134144

135-
case <-closed: // Connection was closed before being established
145+
case <-closed: // Connection was closed by the client before being established
146+
klog.V(2).InfoS("Frontend connection closed before being established", "host", r.Host, "dialID", connection.dialID, "agentID", connection.agentID)
147+
// The deferred cleanup will run when we return here.
148+
return
149+
150+
case <-ctxt.Done(): // Backend connection died before being established
151+
klog.ErrorS(ctxt.Err(), "backend context closed before connection was established", "host", r.Host, "dialID", connection.dialID, "agentID", connection.agentID)
152+
metrics.Metrics.ObserveDialFailure(metrics.DialFailureBackendClose)
153+
// Send proper HTTP error response
154+
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain; charset=utf-8\r\n\r\nBackend context error: %v\r\n", ctxt.Err())))
155+
// The deferred cleanup will run when we return here.
156+
return
136157
}
137158

138159
defer func() {
@@ -148,7 +169,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
148169
if err = backend.Send(packet); err != nil {
149170
klog.V(2).InfoS("failed to send close request packet", "host", r.Host, "agentID", connection.agentID, "connectionID", connection.connectID)
150171
}
151-
conn.Close()
172+
// The top-level defer handles conn.Close()
152173
}()
153174

154175
connID := connection.connectID

0 commit comments

Comments
 (0)