Skip to content

Commit 3fa3b8d

Browse files
feat: add usbredir
Signed-off-by: Yaroslav Borbat <[email protected]>
1 parent 67c9b12 commit 3fa3b8d

File tree

5 files changed

+192
-9
lines changed

5 files changed

+192
-9
lines changed

api/client/kubeclient/async.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,7 @@ func asyncSubresourceHelper(
127127
case err = <-errChan:
128128
return nil, err
129129
case ws := <-aws.Connection:
130-
return &wsStreamer{
131-
conn: ws,
132-
done: done,
133-
}, nil
130+
return newWebsocketStreamer(ws, done), nil
134131
}
135132
}
136133

api/client/kubeclient/streamer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (c *wsConn) SetDeadline(t time.Time) error {
7575
return c.Conn.SetReadDeadline(t)
7676
}
7777

78-
func NewWebsocketStreamer(conn *websocket.Conn, done chan struct{}) *wsStreamer {
78+
func newWebsocketStreamer(conn *websocket.Conn, done chan struct{}) *wsStreamer {
7979
return &wsStreamer{
8080
conn: conn,
8181
done: done,
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package redirclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"os/exec"
9+
"time"
10+
11+
"github.com/deckhouse/virtualization/api/client/generated/clientset/versioned/typed/core/v1alpha2"
12+
)
13+
14+
const usbRedirectClient string = "usbredirect"
15+
16+
type ClientConnectFn func(ctx context.Context, device, address string) error
17+
18+
type Client struct {
19+
// To connect local USB device buffer to the remote VM using the websocket.
20+
inputReader *io.PipeReader
21+
inputWriter *io.PipeWriter
22+
outputReader *io.PipeReader
23+
outputWriter *io.PipeWriter
24+
25+
listener *net.TCPListener
26+
27+
// channels
28+
done chan struct{}
29+
stream chan error
30+
local chan error
31+
remote chan error
32+
33+
ctx context.Context
34+
35+
ClientConnect ClientConnectFn
36+
}
37+
38+
func NewUSBRedirClient(ctx context.Context, address string, stream v1alpha2.StreamInterface) (*Client, error) {
39+
inReader, inWriter := io.Pipe()
40+
outReader, outWriter := io.Pipe()
41+
k := &Client{
42+
ctx: ctx,
43+
inputReader: inReader,
44+
inputWriter: inWriter,
45+
outputReader: outReader,
46+
outputWriter: outWriter,
47+
ClientConnect: clientConnect,
48+
}
49+
50+
// Create local TCP server for usbredir client to connect
51+
if err := k.withLocalTCPClient(address); err != nil {
52+
return nil, err
53+
}
54+
55+
// Start stream with remote usbredir endpoint
56+
k.withRemoteVMIStream(stream)
57+
58+
// Connects data from local usbredir data to remote usbredir endpoint
59+
k.proxyUSBRedir()
60+
61+
return k, nil
62+
}
63+
64+
func (k *Client) withRemoteVMIStream(usbredirStream v1alpha2.StreamInterface) {
65+
k.stream = make(chan error)
66+
67+
go func() {
68+
defer k.outputWriter.Close()
69+
select {
70+
case k.stream <- usbredirStream.Stream(
71+
v1alpha2.StreamOptions{
72+
In: k.inputReader,
73+
Out: k.outputWriter,
74+
},
75+
):
76+
case <-k.ctx.Done():
77+
}
78+
}()
79+
}
80+
81+
func (k *Client) withLocalTCPClient(address string) error {
82+
lnAddr, err := net.ResolveTCPAddr("tcp", address)
83+
if err != nil {
84+
return fmt.Errorf("Can't resolve the address: %s", err.Error())
85+
}
86+
87+
// The local tcp server is used to proxy between remote websocket and local USB
88+
k.listener, err = net.ListenTCP("tcp", lnAddr)
89+
if err != nil {
90+
return fmt.Errorf("Can't listen: %s", err.Error())
91+
}
92+
93+
return nil
94+
}
95+
96+
func (k *Client) proxyUSBRedir() {
97+
// forward data to/from websocket after usbredir client connects.
98+
k.done = make(chan struct{}, 1)
99+
k.remote = make(chan error)
100+
go func() {
101+
defer k.inputWriter.Close()
102+
start := time.Now()
103+
104+
usbredirConn, err := k.listener.Accept()
105+
if err != nil {
106+
fmt.Printf("Failed to accept connection: %s\n", err.Error())
107+
k.remote <- err
108+
return
109+
}
110+
defer usbredirConn.Close()
111+
112+
fmt.Printf("Connected to %s at %v\n", usbRedirectClient, time.Now().Sub(start))
113+
114+
stream := make(chan error)
115+
// write to local usbredir from pipeOutReader
116+
go func() {
117+
_, err := io.Copy(usbredirConn, k.outputReader)
118+
stream <- err
119+
}()
120+
121+
// read from local usbredir towards pipeInWriter
122+
go func() {
123+
_, err := io.Copy(k.inputWriter, usbredirConn)
124+
stream <- err
125+
}()
126+
127+
select {
128+
case <-k.done: // Wait for local usbredir to complete
129+
case err = <-stream: // Wait for remote connection to close
130+
if err == nil {
131+
// Remote connection closed, report this as error
132+
err = fmt.Errorf("Remote connection has closed.")
133+
}
134+
// Wait for local usbredir to complete
135+
k.remote <- err
136+
case <-k.ctx.Done():
137+
}
138+
}()
139+
}
140+
141+
func clientConnect(ctx context.Context, device, address string) error {
142+
bin := usbRedirectClient
143+
args := []string{}
144+
args = append(args, "--device", device, "--to", address)
145+
146+
fmt.Printf("port_arg: '%s'\n", address)
147+
fmt.Printf("args: '%v'\n", args)
148+
fmt.Printf("Executing commandline: '%s %v'\n", bin, args)
149+
150+
command := exec.CommandContext(ctx, bin, args...)
151+
output, err := command.CombinedOutput()
152+
153+
fmt.Printf("%v output: %v", bin, string(output))
154+
155+
return err
156+
}
157+
158+
func (k *Client) Redirect(device string) error {
159+
// execute local usbredir binary
160+
address := k.listener.Addr().String()
161+
k.local = make(chan error)
162+
go func() {
163+
defer close(k.done)
164+
k.local <- k.ClientConnect(k.ctx, device, address)
165+
}()
166+
167+
var err error
168+
select {
169+
case err = <-k.stream:
170+
case err = <-k.local:
171+
case err = <-k.remote:
172+
case <-k.ctx.Done():
173+
err = k.ctx.Err()
174+
175+
}
176+
return err
177+
}

src/cli/internal/cmd/usbredir/usbredir.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/spf13/pflag"
2525

2626
"github.com/deckhouse/virtualization/src/cli/internal/clientconfig"
27+
"github.com/deckhouse/virtualization/src/cli/internal/cmd/usbredir/redirclient"
2728
"github.com/deckhouse/virtualization/src/cli/internal/templates"
2829
)
2930

@@ -105,8 +106,14 @@ func (c *USBRedir) Run(cmd *cobra.Command, args []string) error {
105106
return err
106107
}
107108

108-
redir := NewClient(stream, newUsbRedirector(usbRedirectClient))
109-
device := fmt.Sprintf("%s-%s", c.bus, c.device)
110-
111-
return redir.Redirect(cmd.Context(), device)
109+
if usbRedirClient, err := redirclient.NewUSBRedirClient(cmd.Context(), "localhost:0", stream); err != nil {
110+
return err
111+
} else {
112+
device := fmt.Sprintf("%s-%s", c.bus, c.device)
113+
return usbRedirClient.Redirect(device)
114+
}
115+
//redir := NewClient(stream, newUsbRedirector(usbRedirectClient))
116+
//device := fmt.Sprintf("%s-%s", c.bus, c.device)
117+
//
118+
//return redir.Redirect(cmd.Context(), device)
112119
}

templates/virtualization-api/rbac-for-us.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ rules:
7272
- virtualmachineinstances/unfreeze
7373
- virtualmachineinstances/addvolume
7474
- virtualmachineinstances/removevolume
75+
- virtualmachineinstances/usbredir
7576
verbs:
7677
- get
7778
- patch
@@ -100,6 +101,7 @@ rules:
100101
- virtualmachines/removevolume
101102
- virtualmachines/unfreeze
102103
- virtualmachines/vnc
104+
- virtualmachines/usbredir
103105
verbs:
104106
- get
105107
- patch

0 commit comments

Comments
 (0)