Skip to content

Commit 323aa46

Browse files
committed
fix (pipes): check websocket errors inside CopyToWebsocket()
Previously we were treating EOF on the reader as no-error, meaning that operations like Kubernetes Describe would retry endlessly when finished.
1 parent 7f0f13b commit 323aa46

File tree

5 files changed

+29
-29
lines changed

5 files changed

+29
-29
lines changed

app/multitenant/consul_pipe_router.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func (pr *consulPipeRouter) privateAPI() {
256256
defer conn.Close()
257257

258258
end, _ := pipe.Ends()
259-
if err := pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
259+
if _, err := pipe.CopyToWebsocket(end, conn); err != nil {
260260
log.Errorf("%s: Server bridge connection; Error copying pipe to websocket: %v", key, err)
261261
}
262262
})
@@ -446,7 +446,7 @@ func (bc *bridgeConnection) loop() {
446446
bc.conn = conn
447447
bc.mtx.Unlock()
448448

449-
if err := bc.pipe.CopyToWebsocket(end, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
449+
if _, err := bc.pipe.CopyToWebsocket(end, conn); err != nil {
450450
log.Errorf("%s: Client bridge connection; Error copying pipe to websocket: %v", bc.key, err)
451451
}
452452
conn.Close()

app/pipes.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,11 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
6767
}
6868
defer conn.Close()
6969

70-
if err := pipe.CopyToWebsocket(endIO, conn); err != nil {
70+
if _, err := pipe.CopyToWebsocket(endIO, conn); err != nil {
7171
if span := opentracing.SpanFromContext(ctx); span != nil {
7272
span.LogKV("error", err.Error())
7373
}
74-
if !xfer.IsExpectedWSCloseError(err) {
75-
log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
76-
}
74+
log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
7775
}
7876
}
7977
}

common/xfer/pipes.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
// to the UI.
1212
type Pipe interface {
1313
Ends() (io.ReadWriter, io.ReadWriter)
14-
CopyToWebsocket(io.ReadWriter, Websocket) error
14+
CopyToWebsocket(io.ReadWriter, Websocket) (bool, error)
1515

1616
Close() error
1717
Closed() bool
@@ -99,27 +99,26 @@ func (p *pipe) OnClose(f func()) {
9999
}
100100

101101
// CopyToWebsocket copies pipe data to/from a websocket. It blocks.
102-
func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
102+
// Returns bool 'done' and an error, masked if websocket closed in an expected manner.
103+
func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) (bool, error) {
103104
p.mtx.Lock()
104105
if p.closed {
105106
p.mtx.Unlock()
106-
return nil
107+
return true, nil
107108
}
108109
p.wg.Add(1)
109110
p.mtx.Unlock()
110111
defer p.wg.Done()
111112

112-
// The goroutines below both post their errors to the channel, but if you close()
113-
// the pipe before any errors then the pipe may not get read from. Therefore it
114-
// needs up to 2 slots free.
115-
errors := make(chan error, 2)
113+
endError := make(chan error, 1)
114+
connError := make(chan error, 1)
116115

117116
// Read-from-UI loop
118117
go func() {
119118
for {
120119
_, buf, err := conn.ReadMessage() // TODO type should be binary message
121120
if err != nil {
122-
errors <- err
121+
connError <- err
123122
return
124123
}
125124

@@ -128,7 +127,7 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
128127
}
129128

130129
if _, err := end.Write(buf); err != nil {
131-
errors <- err
130+
endError <- err
132131
return
133132
}
134133
}
@@ -140,7 +139,7 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
140139
for {
141140
n, err := end.Read(buf)
142141
if err != nil {
143-
errors <- err
142+
endError <- err
144143
return
145144
}
146145

@@ -149,7 +148,7 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
149148
}
150149

151150
if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
152-
errors <- err
151+
connError <- err
153152
return
154153
}
155154
}
@@ -158,9 +157,14 @@ func (p *pipe) CopyToWebsocket(end io.ReadWriter, conn Websocket) error {
158157
// block until one of the goroutines exits
159158
// this convoluted mechanism is to ensure we only close the websocket once.
160159
select {
161-
case err := <-errors:
162-
return err
160+
case err := <-endError:
161+
return false, err
162+
case err := <-connError:
163+
if IsExpectedWSCloseError(err) {
164+
return false, nil
165+
}
166+
return false, err
163167
case <-p.quit:
164-
return nil
168+
return true, nil
165169
}
166170
}

probe/appclient/app_client.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,8 @@ func (c *appClient) pipeConnection(id string, pipe xfer.Pipe) (bool, error) {
373373
defer c.closeConn(id)
374374

375375
_, remote := pipe.Ends()
376-
if err := pipe.CopyToWebsocket(remote, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
377-
return false, err
378-
}
379-
return false, nil
376+
done, err := pipe.CopyToWebsocket(remote, conn)
377+
return done, err
380378
}
381379

382380
func (c *appClient) PipeConnection(id string, pipe xfer.Pipe) {

probe/docker/controls_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ func TestControls(t *testing.T) {
4646

4747
type mockPipe struct{}
4848

49-
func (mockPipe) Ends() (io.ReadWriter, io.ReadWriter) { return nil, nil }
50-
func (mockPipe) CopyToWebsocket(io.ReadWriter, xfer.Websocket) error { return nil }
51-
func (mockPipe) Close() error { return nil }
52-
func (mockPipe) Closed() bool { return false }
53-
func (mockPipe) OnClose(func()) {}
49+
func (mockPipe) Ends() (io.ReadWriter, io.ReadWriter) { return nil, nil }
50+
func (mockPipe) CopyToWebsocket(io.ReadWriter, xfer.Websocket) (bool, error) { return true, nil }
51+
func (mockPipe) Close() error { return nil }
52+
func (mockPipe) Closed() bool { return false }
53+
func (mockPipe) OnClose(func()) {}
5454

5555
func TestPipes(t *testing.T) {
5656
oldNewPipe := controls.NewPipe

0 commit comments

Comments
 (0)