From 050436114602187c3bc73daa77e8f9e578a403cb Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 1 Dec 2021 21:48:55 +0100 Subject: [PATCH 1/8] use udpMux to discover srflx candidates --- agent.go | 19 ++++- agent_config.go | 5 ++ gather.go | 68 ++++++++++++++++- udp_mux.go | 197 +++++++++++++++++++++++++++++++++++++++++++++--- udp_mux_test.go | 90 ++++++++++++++++++++++ 5 files changed, 363 insertions(+), 16 deletions(-) diff --git a/agent.go b/agent.go index b4aba19f..dcb55fe9 100644 --- a/agent.go +++ b/agent.go @@ -122,9 +122,10 @@ type Agent struct { loggerFactory logging.LoggerFactory log logging.LeveledLogger - net *vnet.Net - tcpMux TCPMux - udpMux UDPMux + net *vnet.Net + tcpMux TCPMux + udpMux UDPMux + udpMuxSrflx UDPMux interfaceFilter func(string) bool @@ -319,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit a.tcpMux = newInvalidTCPMux() } a.udpMux = config.UDPMux + a.udpMuxSrflx = config.UDPMuxSrflx if a.net == nil { a.net = vnet.NewNet(nil) @@ -892,6 +894,9 @@ func (a *Agent) removeUfragFromMux() { if a.udpMux != nil { a.udpMux.RemoveConnByUfrag(a.localUfrag) } + if a.udpMuxSrflx != nil { + a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag) + } } // Close cleans up the Agent @@ -905,7 +910,13 @@ func (a *Agent) Close() error { }) a.err.Store(ErrClosed) - a.removeUfragFromMux() + a.tcpMux.RemoveConnByUfrag(a.localUfrag) + if a.udpMux != nil { + a.udpMux.RemoveConnByUfrag(a.localUfrag) + } + if a.udpMuxSrflx != nil { + a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag) + } close(a.done) <-a.taskLoopDone diff --git a/agent_config.go b/agent_config.go index e577af10..57558136 100644 --- a/agent_config.go +++ b/agent_config.go @@ -150,6 +150,11 @@ type AgentConfig struct { // defer to UDPMux for incoming connections UDPMux UDPMux + // UDPMux is used for multiplexing multiple incoming UDP connections on a single port + // when this is set, the agent ignores PortMin and PortMax configurations and will + // defer to UDPMux for incoming connections + UDPMuxSrflx UDPMux + // Proxy Dialer is a dialer that should be implemented by the user based on golang.org/x/net/proxy // dial interface in order to support corporate proxies ProxyDialer proxy.Dialer diff --git a/gather.go b/gather.go index e248c08d..2e4d5c86 100644 --- a/gather.go +++ b/gather.go @@ -97,7 +97,11 @@ func (a *Agent) gatherCandidates(ctx context.Context) { case CandidateTypeServerReflexive: wg.Add(1) go func() { - a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes) + if a.udpMuxSrflx != nil { + a.gatherCandidatesSrflxUDPMux(ctx, a.urls, a.networkTypes) + } else { + a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes) + } wg.Done() }() if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive { @@ -333,6 +337,68 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes [] } } +func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, networkTypes []NetworkType) { + var wg sync.WaitGroup + defer wg.Wait() + + for _, networkType := range networkTypes { + if networkType.IsTCP() { + continue + } + + for i := range urls { + wg.Add(1) + go func(url URL, network string) { + defer wg.Done() + + hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port) + serverAddr, err := a.net.ResolveUDPAddr(network, hostPort) + if err != nil { + a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err) + return + } + + xoraddr, err := a.udpMuxSrflx.GetXORMappedAddr(serverAddr, stunGatherTimeout) + if err != nil { + a.log.Warnf("could not get server reflexive address %s %s: %v\n", network, url, err) + return + } + + conn, err := a.udpMuxSrflx.GetConn(a.localUfrag) + if err != nil { + a.log.Warnf("could not find local connection in UDPMux %s %s: %v\n", network, url, err) + return + } + + ip := xoraddr.IP + port := xoraddr.Port + + laddr := conn.LocalAddr().(*net.UDPAddr) + srflxConfig := CandidateServerReflexiveConfig{ + Network: network, + Address: ip.String(), + Port: port, + Component: ComponentRTP, + RelAddr: laddr.IP.String(), + RelPort: laddr.Port, + } + c, err := NewCandidateServerReflexive(&srflxConfig) + if err != nil { + closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err)) + return + } + + if err := a.addCandidate(ctx, c, conn); err != nil { + if closeErr := c.close(); closeErr != nil { + a.log.Warnf("Failed to close candidate: %v", closeErr) + } + a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err) + } + }(*urls[i], networkType.String()) + } + } +} + func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkTypes []NetworkType) { var wg sync.WaitGroup defer wg.Wait() diff --git a/udp_mux.go b/udp_mux.go index 0bbe3b5c..f2f0ef94 100644 --- a/udp_mux.go +++ b/udp_mux.go @@ -1,11 +1,14 @@ package ice import ( + "context" + "fmt" "io" "net" "os" "strings" "sync" + "time" "github.com/pion/logging" "github.com/pion/stun" @@ -16,6 +19,7 @@ type UDPMux interface { io.Closer GetConn(ufrag string) (net.PacketConn, error) RemoveConnByUfrag(ufrag string) + GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) } // UDPMuxDefault is an implementation of the interface @@ -35,14 +39,20 @@ type UDPMuxDefault struct { pool *sync.Pool mu sync.Mutex + + // since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents + // todo update it periodically + // stun.XORMappedAddress indexed by the STUN server addr + xorMappedAddr map[string]*xorAddrMap } const maxAddrSize = 512 // UDPMuxParams are parameters for UDPMux. type UDPMuxParams struct { - Logger logging.LeveledLogger - UDPConn net.PacketConn + Logger logging.LeveledLogger + UDPConn net.PacketConn + XORMappedAddrCacheTTL time.Duration } // NewUDPMuxDefault creates an implementation of UDPMux @@ -51,6 +61,10 @@ func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault { params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice") } + if params.XORMappedAddrCacheTTL == 0 { + params.XORMappedAddrCacheTTL = time.Second * 25 + } + m := &UDPMuxDefault{ addressMap: map[string]*udpMuxedConn{}, params: params, @@ -62,6 +76,7 @@ func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault { return newBufferHolder(receiveMTU + maxAddrSize) }, }, + xorMappedAddr: make(map[string]*xorAddrMap), } go m.connWorker() @@ -204,6 +219,40 @@ func (m *UDPMuxDefault) createMuxedConn(key string) *udpMuxedConn { return c } +func (m *UDPMuxDefault) handleRemotePing(msg *stun.Message) (*udpMuxedConn, error) { + attr, err := msg.Get(stun.AttrUsername) + if err != nil { + return nil, err + } + + ufrag := strings.Split(string(attr), ":")[0] + m.mu.Lock() + destinationConn := m.conns[ufrag] + m.mu.Unlock() + return destinationConn, nil +} + +func (m *UDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun.Message) error { + m.mu.Lock() + defer m.mu.Unlock() + + mappedAddr, ok := m.xorMappedAddr[stunAddr.String()] + if !ok { + // todo: investigate why remote peers send XORMappedAddr + return fmt.Errorf("no address map for %s", stunAddr.String()) + } + + var addr stun.XORMappedAddress + if err := addr.GetFrom(msg); err != nil { + return err + } + + m.xorMappedAddr[stunAddr.String()] = mappedAddr + mappedAddr.SetAddr(&addr) + + return nil +} + func (m *UDPMuxDefault) connWorker() { logger := m.params.Logger @@ -248,17 +297,21 @@ func (m *UDPMuxDefault) connWorker() { continue } - attr, stunAttrErr := msg.Get(stun.AttrUsername) - if stunAttrErr != nil { - m.params.Logger.Warnf("No Username attribute in STUN message from %s\n", addr.String()) - continue + if isRemotePing(msg) { + destinationConn, err = m.handleRemotePing(msg) + if err != nil { + m.params.Logger.Warnf("No Username attribute in STUN message from %s\n", addr.String()) + continue + } } - ufrag := strings.Split(string(attr), ":")[0] - - m.mu.Lock() - destinationConn = m.conns[ufrag] - m.mu.Unlock() + if isXORMappedResponse(msg) { + err = m.handleXORMappedResponse(udpAddr, msg) + if err != nil { + m.params.Logger.Errorf("%w: %v", errGetXorMappedAddrResponse, err) + } + continue + } } if destinationConn == nil { @@ -272,6 +325,96 @@ func (m *UDPMuxDefault) connWorker() { } } +// isXORMappedResponse indicates whether the message is a XORMappedAddress response from the STUN server +func isXORMappedResponse(msg *stun.Message) bool { + _, err := msg.Get(stun.AttrXORMappedAddress) + return err == nil +} + +// isRemotePing indicates whether the message is a ping from a remote candidate +func isRemotePing(msg *stun.Message) bool { + _, err := msg.Get(stun.AttrUsername) + return err == nil +} + +// GetXORMappedAddr returns *stun.XORMappedAddress if already present for a given STUN server. +// +// Makes a STUN binding request to discover mapped address otherwise. +// Blocks until the response is received. The response will be handled by UDPMuxDefault.connWorker +// Method is safe for concurrent use. +func (m *UDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) { + m.mu.Lock() + mappedAddr, ok := m.xorMappedAddr[serverAddr.String()] + // if we already have a mapping for this STUN server (address already recieved) + // and if it is not too old we return it without making a new request to STUN server + if ok { + if mappedAddr.expired() { + mappedAddr.closeWaiters() + delete(m.xorMappedAddr, serverAddr.String()) + ok = false + } else if mappedAddr.pending() { + ok = false + } + } + m.mu.Unlock() + if ok { + return mappedAddr.addr, nil + } + + // otherwise, make a STUN request to discover the address + // or wait for already sent request to complete + waitAddrReceived, err := m.sendStun(serverAddr) + if err != nil { + return nil, fmt.Errorf("could not send STUN request: %v", err) + } + + // block until response was handled by the connWorker routine and XORMappedAddress was updated + select { + case <-waitAddrReceived: + // when channel closed, addr was obtained + m.mu.Lock() + mappedAddr := *m.xorMappedAddr[serverAddr.String()] + m.mu.Unlock() + if mappedAddr.addr == nil { + return nil, fmt.Errorf("no XORMappedAddress for %s", serverAddr.String()) + } + return mappedAddr.addr, nil + case <-time.After(deadline): + return nil, fmt.Errorf("timeout while waiting for XORMappedAddr") + } +} + +// sendStun sends a STUN request via UDP conn. +// +// The returned channel is closed when the STUN response has been received. +// Method is safe for concurrent use. +func (m *UDPMuxDefault) sendStun(serverAddr net.Addr) (chan struct{}, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // if record present in the map, we already sent a STUN request, + // just wait when waitAddrRecieved will be closed + addrMap, ok := m.xorMappedAddr[serverAddr.String()] + if !ok { + addrMap = &xorAddrMap{ + expiresAt: time.Now().Add(m.params.XORMappedAddrCacheTTL), + waitAddrReceived: make(chan struct{}), + } + m.xorMappedAddr[serverAddr.String()] = addrMap + } + + req, err := stun.Build(stun.BindingRequest, stun.TransactionID) + if err != nil { + return nil, err + } + + if _, err = m.params.UDPConn.WriteTo(req.Raw, serverAddr); err != nil { + return nil, err + } + + return addrMap.waitAddrReceived, nil +} + type bufferHolder struct { buffer []byte } @@ -281,3 +424,35 @@ func newBufferHolder(size int) *bufferHolder { buffer: make([]byte, size), } } + +type xorAddrMap struct { + ctx context.Context + addr *stun.XORMappedAddress + waitAddrReceived chan struct{} + expiresAt time.Time +} + +func (a *xorAddrMap) closeWaiters() { + select { + case <-a.waitAddrReceived: + // notify was close, ok, that means we received duplicate response + // just exit + break + default: + // notify tha twe have a new addr + close(a.waitAddrReceived) + } +} + +func (a *xorAddrMap) pending() bool { + return a.addr == nil +} + +func (a *xorAddrMap) expired() bool { + return a.expiresAt.Before(time.Now()) +} + +func (a *xorAddrMap) SetAddr(addr *stun.XORMappedAddress) { + a.addr = addr + a.closeWaiters() +} diff --git a/udp_mux_test.go b/udp_mux_test.go index 3dd47f9a..b7e27bec 100644 --- a/udp_mux_test.go +++ b/udp_mux_test.go @@ -1,3 +1,4 @@ +//go:build !js // +build !js package ice @@ -60,6 +61,12 @@ func TestUDPMux(t *testing.T) { testMuxConnection(t, udpMux, "ufrag3", "udp6") } + wg.Add(1) + go func() { + defer wg.Done() + testMuxSrflxConnection(t, udpMux, "ufrag4", udp) + }() + wg.Wait() require.NoError(t, udpMux.Close()) @@ -213,6 +220,89 @@ func testMuxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, networ <-remoteReadDone } +func testMuxSrflxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, network string) { + pktConn, err := udpMux.GetConn(ufrag) + require.NoError(t, err, "error retrieving muxed connection for ufrag") + defer func() { + _ = pktConn.Close() + }() + + remoteConn, err := net.DialUDP(network, nil, &net.UDPAddr{ + Port: udpMux.LocalAddr().(*net.UDPAddr).Port, + }) + require.NoError(t, err, "error dialing test udp connection") + defer func() { + _ = remoteConn.Close() + }() + + // use small value for TTL to check expiration of the address + udpMux.params.XORMappedAddrCacheTTL = time.Millisecond * 20 + testXORIP := net.ParseIP("213.141.156.236") + testXORPort := 21254 + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) + require.NoError(t, err) + require.NotNil(t, address) + require.True(t, address.IP.Equal(testXORIP)) + require.Equal(t, address.Port, testXORPort) + }() + + // wait until GetXORMappedAddr calls sendStun method + time.Sleep(time.Millisecond) + + // check that mapped address filled correctly after sent stun + udpMux.mu.Lock() + mappedAddr, ok := udpMux.xorMappedAddr[remoteConn.LocalAddr().String()] + udpMux.mu.Unlock() + require.True(t, ok) + require.NotNil(t, mappedAddr) + require.True(t, mappedAddr.pending()) + require.False(t, mappedAddr.expired()) + + // clean receiver read buffer + buf := make([]byte, receiveMTU) + _, err = remoteConn.Read(buf) + require.NoError(t, err) + + // write back to udpMux XOR message with address + msg := stun.New() + msg.Type = stun.MessageType{Method: stun.MethodBinding, Class: stun.ClassRequest} + msg.Add(stun.AttrUsername, []byte(ufrag+":otherufrag")) + addr := &stun.XORMappedAddress{ + IP: testXORIP, + Port: testXORPort, + } + addr.AddTo(msg) + msg.Encode() + _, err = remoteConn.Write(msg.Raw) + require.NoError(t, err) + + // wait for the packet to be consumed and parsed by udpMux + wg.Wait() + + // we should get address immediately from the cached map + address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) + require.NoError(t, err) + require.NotNil(t, address) + + // check mappedAddr is not pending, we didn't send stun twice + require.False(t, mappedAddr.pending()) + + // check expiration by TTL + time.Sleep(time.Millisecond * 21) + require.True(t, mappedAddr.expired()) + + // after expire, we send stun request again + // but we not receive response in 5 milliseconds and should get error here + address, err = udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Millisecond*5) + require.NotNil(t, err) + require.Nil(t, address) +} + func verifyPacket(t *testing.T, b []byte, nextSeq uint32) { readSeq := binary.LittleEndian.Uint32(b[0:4]) require.Equal(t, nextSeq, readSeq) From 8fd13cd7148541d462910612457164390af92f3e Mon Sep 17 00:00:00 2001 From: Givi Khojanashvili Date: Wed, 9 Feb 2022 22:23:48 +0400 Subject: [PATCH 2/8] Review fixes for server reflexive UDPMux --- agent.go | 8 +------- gather.go | 2 +- udp_mux.go | 2 -- udp_mux_test.go | 4 +++- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/agent.go b/agent.go index dcb55fe9..3469fc48 100644 --- a/agent.go +++ b/agent.go @@ -910,13 +910,7 @@ func (a *Agent) Close() error { }) a.err.Store(ErrClosed) - a.tcpMux.RemoveConnByUfrag(a.localUfrag) - if a.udpMux != nil { - a.udpMux.RemoveConnByUfrag(a.localUfrag) - } - if a.udpMuxSrflx != nil { - a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag) - } + a.removeUfragFromMux() close(a.done) <-a.taskLoopDone diff --git a/gather.go b/gather.go index 2e4d5c86..cd7f7d21 100644 --- a/gather.go +++ b/gather.go @@ -366,7 +366,7 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, ne conn, err := a.udpMuxSrflx.GetConn(a.localUfrag) if err != nil { - a.log.Warnf("could not find local connection in UDPMux %s %s: %v\n", network, url, err) + a.log.Warnf("could not find connection in UDPMuxSrflx %s %s: %v\n", network, url, err) return } diff --git a/udp_mux.go b/udp_mux.go index f2f0ef94..4816ca36 100644 --- a/udp_mux.go +++ b/udp_mux.go @@ -41,7 +41,6 @@ type UDPMuxDefault struct { mu sync.Mutex // since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents - // todo update it periodically // stun.XORMappedAddress indexed by the STUN server addr xorMappedAddr map[string]*xorAddrMap } @@ -238,7 +237,6 @@ func (m *UDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun mappedAddr, ok := m.xorMappedAddr[stunAddr.String()] if !ok { - // todo: investigate why remote peers send XORMappedAddr return fmt.Errorf("no address map for %s", stunAddr.String()) } diff --git a/udp_mux_test.go b/udp_mux_test.go index b7e27bec..9edcdd6c 100644 --- a/udp_mux_test.go +++ b/udp_mux_test.go @@ -257,11 +257,11 @@ func testMuxSrflxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, n // check that mapped address filled correctly after sent stun udpMux.mu.Lock() mappedAddr, ok := udpMux.xorMappedAddr[remoteConn.LocalAddr().String()] - udpMux.mu.Unlock() require.True(t, ok) require.NotNil(t, mappedAddr) require.True(t, mappedAddr.pending()) require.False(t, mappedAddr.expired()) + udpMux.mu.Unlock() // clean receiver read buffer buf := make([]byte, receiveMTU) @@ -289,12 +289,14 @@ func testMuxSrflxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, n require.NoError(t, err) require.NotNil(t, address) + udpMux.mu.Lock() // check mappedAddr is not pending, we didn't send stun twice require.False(t, mappedAddr.pending()) // check expiration by TTL time.Sleep(time.Millisecond * 21) require.True(t, mappedAddr.expired()) + udpMux.mu.Unlock() // after expire, we send stun request again // but we not receive response in 5 milliseconds and should get error here From 138db20d36ad19d6561cb6f4739ca4946fea51f6 Mon Sep 17 00:00:00 2001 From: Mikhail Bragin Date: Wed, 16 Feb 2022 15:47:53 +0100 Subject: [PATCH 3/8] Add UniversalUDPMux to enable single port ICE for srflx candidates UDPMux only works for the HOST candidates. UniversalUDPMux adds support for SRFLX candidates and will later support Relay candidates. UniversalUDPMux embeds UDPMux and overrides the actual PacketConn's ReadFrom to handle STUN server packets. Then it forwards packets further to UDPMux. --- agent.go | 2 +- agent_config.go | 2 +- gather.go | 5 +- go.mod | 2 +- go.sum | 4 +- udp_mux.go | 205 +++-------------------------- udp_mux_test.go | 92 ------------- udp_mux_universal.go | 262 ++++++++++++++++++++++++++++++++++++++ udp_mux_universal_test.go | 123 ++++++++++++++++++ 9 files changed, 412 insertions(+), 285 deletions(-) create mode 100644 udp_mux_universal.go create mode 100644 udp_mux_universal_test.go diff --git a/agent.go b/agent.go index 3469fc48..aa2a1d6f 100644 --- a/agent.go +++ b/agent.go @@ -125,7 +125,7 @@ type Agent struct { net *vnet.Net tcpMux TCPMux udpMux UDPMux - udpMuxSrflx UDPMux + udpMuxSrflx UniversalUDPMux interfaceFilter func(string) bool diff --git a/agent_config.go b/agent_config.go index 57558136..355e05ae 100644 --- a/agent_config.go +++ b/agent_config.go @@ -153,7 +153,7 @@ type AgentConfig struct { // UDPMux is used for multiplexing multiple incoming UDP connections on a single port // when this is set, the agent ignores PortMin and PortMax configurations and will // defer to UDPMux for incoming connections - UDPMuxSrflx UDPMux + UDPMuxSrflx UniversalUDPMux // Proxy Dialer is a dialer that should be implemented by the user based on golang.org/x/net/proxy // dial interface in order to support corporate proxies diff --git a/gather.go b/gather.go index cd7f7d21..8bb1f96f 100644 --- a/gather.go +++ b/gather.go @@ -347,6 +347,9 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, ne } for i := range urls { + if !(urls[i].Scheme == SchemeTypeSTUN || urls[i].Scheme == SchemeTypeSTUNS) { + continue + } wg.Add(1) go func(url URL, network string) { defer wg.Done() @@ -364,7 +367,7 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, ne return } - conn, err := a.udpMuxSrflx.GetConn(a.localUfrag) + conn, err := a.udpMuxSrflx.GetConnForURL(a.localUfrag, url.String()) if err != nil { a.log.Warnf("could not find connection in UDPMuxSrflx %s %s: %v\n", network, url, err) return diff --git a/go.mod b/go.mod index 750e80e3..35b8113f 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/pion/randutil v0.1.0 github.com/pion/stun v0.3.5 github.com/pion/transport v0.13.0 - github.com/pion/turn/v2 v2.0.6 + github.com/pion/turn/v2 v2.0.7 github.com/stretchr/testify v1.7.0 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index d591a889..9e90febc 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2U github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= github.com/pion/transport v0.13.0 h1:KWTA5ZrQogizzYwPEciGtHPLwpAjE91FgXnyu+Hv2uY= github.com/pion/transport v0.13.0/go.mod h1:yxm9uXpK9bpBBWkITk13cLo1y5/ur5VQpG22ny6EP7g= -github.com/pion/turn/v2 v2.0.6 h1:AsXjSPR6Im15DMTB39NlfdTY9BQfieANPBjdg/aVNwY= -github.com/pion/turn/v2 v2.0.6/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw= +github.com/pion/turn/v2 v2.0.7 h1:SZhc00WDovK6czaN1RSiHqbwANtIO6wfZQsU0m0KNE8= +github.com/pion/turn/v2 v2.0.7/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw= github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/udp_mux.go b/udp_mux.go index 4816ca36..f8fdd98a 100644 --- a/udp_mux.go +++ b/udp_mux.go @@ -1,17 +1,14 @@ package ice import ( - "context" - "fmt" + "github.com/pion/stun" "io" "net" "os" "strings" "sync" - "time" "github.com/pion/logging" - "github.com/pion/stun" ) // UDPMux allows multiple connections to go over a single UDP port @@ -19,7 +16,6 @@ type UDPMux interface { io.Closer GetConn(ufrag string) (net.PacketConn, error) RemoveConnByUfrag(ufrag string) - GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) } // UDPMuxDefault is an implementation of the interface @@ -39,19 +35,14 @@ type UDPMuxDefault struct { pool *sync.Pool mu sync.Mutex - - // since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents - // stun.XORMappedAddress indexed by the STUN server addr - xorMappedAddr map[string]*xorAddrMap } const maxAddrSize = 512 // UDPMuxParams are parameters for UDPMux. type UDPMuxParams struct { - Logger logging.LeveledLogger - UDPConn net.PacketConn - XORMappedAddrCacheTTL time.Duration + Logger logging.LeveledLogger + UDPConn net.PacketConn } // NewUDPMuxDefault creates an implementation of UDPMux @@ -60,10 +51,6 @@ func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault { params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice") } - if params.XORMappedAddrCacheTTL == 0 { - params.XORMappedAddrCacheTTL = time.Second * 25 - } - m := &UDPMuxDefault{ addressMap: map[string]*udpMuxedConn{}, params: params, @@ -75,7 +62,6 @@ func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault { return newBufferHolder(receiveMTU + maxAddrSize) }, }, - xorMappedAddr: make(map[string]*xorAddrMap), } go m.connWorker() @@ -111,12 +97,16 @@ func (m *UDPMuxDefault) GetConn(ufrag string) (net.PacketConn, error) { return c, nil } -// RemoveConnByUfrag stops and removes the muxed packet connection +// RemoveConnByUfrag stops and removes the all muxed packet connections with a ufrag prefix func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) { + // it happens probably when we close not yet initialized agent + if ufrag == "" { + return + } m.mu.Lock() removedConns := make([]*udpMuxedConn, 0) for key := range m.conns { - if key != ufrag { + if !strings.HasPrefix(key, ufrag) { continue } @@ -218,39 +208,6 @@ func (m *UDPMuxDefault) createMuxedConn(key string) *udpMuxedConn { return c } -func (m *UDPMuxDefault) handleRemotePing(msg *stun.Message) (*udpMuxedConn, error) { - attr, err := msg.Get(stun.AttrUsername) - if err != nil { - return nil, err - } - - ufrag := strings.Split(string(attr), ":")[0] - m.mu.Lock() - destinationConn := m.conns[ufrag] - m.mu.Unlock() - return destinationConn, nil -} - -func (m *UDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun.Message) error { - m.mu.Lock() - defer m.mu.Unlock() - - mappedAddr, ok := m.xorMappedAddr[stunAddr.String()] - if !ok { - return fmt.Errorf("no address map for %s", stunAddr.String()) - } - - var addr stun.XORMappedAddress - if err := addr.GetFrom(msg); err != nil { - return err - } - - m.xorMappedAddr[stunAddr.String()] = mappedAddr - mappedAddr.SetAddr(&addr) - - return nil -} - func (m *UDPMuxDefault) connWorker() { logger := m.params.Logger @@ -295,21 +252,17 @@ func (m *UDPMuxDefault) connWorker() { continue } - if isRemotePing(msg) { - destinationConn, err = m.handleRemotePing(msg) - if err != nil { - m.params.Logger.Warnf("No Username attribute in STUN message from %s\n", addr.String()) - continue - } - } - - if isXORMappedResponse(msg) { - err = m.handleXORMappedResponse(udpAddr, msg) - if err != nil { - m.params.Logger.Errorf("%w: %v", errGetXorMappedAddrResponse, err) - } + attr, stunAttrErr := msg.Get(stun.AttrUsername) + if stunAttrErr != nil { + m.params.Logger.Warnf("No Username attribute in STUN message from %s\n", addr.String()) continue } + + ufrag := strings.Split(string(attr), ":")[0] + + m.mu.Lock() + destinationConn = m.conns[ufrag] + m.mu.Unlock() } if destinationConn == nil { @@ -323,96 +276,6 @@ func (m *UDPMuxDefault) connWorker() { } } -// isXORMappedResponse indicates whether the message is a XORMappedAddress response from the STUN server -func isXORMappedResponse(msg *stun.Message) bool { - _, err := msg.Get(stun.AttrXORMappedAddress) - return err == nil -} - -// isRemotePing indicates whether the message is a ping from a remote candidate -func isRemotePing(msg *stun.Message) bool { - _, err := msg.Get(stun.AttrUsername) - return err == nil -} - -// GetXORMappedAddr returns *stun.XORMappedAddress if already present for a given STUN server. -// -// Makes a STUN binding request to discover mapped address otherwise. -// Blocks until the response is received. The response will be handled by UDPMuxDefault.connWorker -// Method is safe for concurrent use. -func (m *UDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) { - m.mu.Lock() - mappedAddr, ok := m.xorMappedAddr[serverAddr.String()] - // if we already have a mapping for this STUN server (address already recieved) - // and if it is not too old we return it without making a new request to STUN server - if ok { - if mappedAddr.expired() { - mappedAddr.closeWaiters() - delete(m.xorMappedAddr, serverAddr.String()) - ok = false - } else if mappedAddr.pending() { - ok = false - } - } - m.mu.Unlock() - if ok { - return mappedAddr.addr, nil - } - - // otherwise, make a STUN request to discover the address - // or wait for already sent request to complete - waitAddrReceived, err := m.sendStun(serverAddr) - if err != nil { - return nil, fmt.Errorf("could not send STUN request: %v", err) - } - - // block until response was handled by the connWorker routine and XORMappedAddress was updated - select { - case <-waitAddrReceived: - // when channel closed, addr was obtained - m.mu.Lock() - mappedAddr := *m.xorMappedAddr[serverAddr.String()] - m.mu.Unlock() - if mappedAddr.addr == nil { - return nil, fmt.Errorf("no XORMappedAddress for %s", serverAddr.String()) - } - return mappedAddr.addr, nil - case <-time.After(deadline): - return nil, fmt.Errorf("timeout while waiting for XORMappedAddr") - } -} - -// sendStun sends a STUN request via UDP conn. -// -// The returned channel is closed when the STUN response has been received. -// Method is safe for concurrent use. -func (m *UDPMuxDefault) sendStun(serverAddr net.Addr) (chan struct{}, error) { - m.mu.Lock() - defer m.mu.Unlock() - - // if record present in the map, we already sent a STUN request, - // just wait when waitAddrRecieved will be closed - addrMap, ok := m.xorMappedAddr[serverAddr.String()] - if !ok { - addrMap = &xorAddrMap{ - expiresAt: time.Now().Add(m.params.XORMappedAddrCacheTTL), - waitAddrReceived: make(chan struct{}), - } - m.xorMappedAddr[serverAddr.String()] = addrMap - } - - req, err := stun.Build(stun.BindingRequest, stun.TransactionID) - if err != nil { - return nil, err - } - - if _, err = m.params.UDPConn.WriteTo(req.Raw, serverAddr); err != nil { - return nil, err - } - - return addrMap.waitAddrReceived, nil -} - type bufferHolder struct { buffer []byte } @@ -422,35 +285,3 @@ func newBufferHolder(size int) *bufferHolder { buffer: make([]byte, size), } } - -type xorAddrMap struct { - ctx context.Context - addr *stun.XORMappedAddress - waitAddrReceived chan struct{} - expiresAt time.Time -} - -func (a *xorAddrMap) closeWaiters() { - select { - case <-a.waitAddrReceived: - // notify was close, ok, that means we received duplicate response - // just exit - break - default: - // notify tha twe have a new addr - close(a.waitAddrReceived) - } -} - -func (a *xorAddrMap) pending() bool { - return a.addr == nil -} - -func (a *xorAddrMap) expired() bool { - return a.expiresAt.Before(time.Now()) -} - -func (a *xorAddrMap) SetAddr(addr *stun.XORMappedAddress) { - a.addr = addr - a.closeWaiters() -} diff --git a/udp_mux_test.go b/udp_mux_test.go index 9edcdd6c..67121de1 100644 --- a/udp_mux_test.go +++ b/udp_mux_test.go @@ -60,13 +60,6 @@ func TestUDPMux(t *testing.T) { if ptrSize != 32 { testMuxConnection(t, udpMux, "ufrag3", "udp6") } - - wg.Add(1) - go func() { - defer wg.Done() - testMuxSrflxConnection(t, udpMux, "ufrag4", udp) - }() - wg.Wait() require.NoError(t, udpMux.Close()) @@ -220,91 +213,6 @@ func testMuxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, networ <-remoteReadDone } -func testMuxSrflxConnection(t *testing.T, udpMux *UDPMuxDefault, ufrag string, network string) { - pktConn, err := udpMux.GetConn(ufrag) - require.NoError(t, err, "error retrieving muxed connection for ufrag") - defer func() { - _ = pktConn.Close() - }() - - remoteConn, err := net.DialUDP(network, nil, &net.UDPAddr{ - Port: udpMux.LocalAddr().(*net.UDPAddr).Port, - }) - require.NoError(t, err, "error dialing test udp connection") - defer func() { - _ = remoteConn.Close() - }() - - // use small value for TTL to check expiration of the address - udpMux.params.XORMappedAddrCacheTTL = time.Millisecond * 20 - testXORIP := net.ParseIP("213.141.156.236") - testXORPort := 21254 - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) - require.NoError(t, err) - require.NotNil(t, address) - require.True(t, address.IP.Equal(testXORIP)) - require.Equal(t, address.Port, testXORPort) - }() - - // wait until GetXORMappedAddr calls sendStun method - time.Sleep(time.Millisecond) - - // check that mapped address filled correctly after sent stun - udpMux.mu.Lock() - mappedAddr, ok := udpMux.xorMappedAddr[remoteConn.LocalAddr().String()] - require.True(t, ok) - require.NotNil(t, mappedAddr) - require.True(t, mappedAddr.pending()) - require.False(t, mappedAddr.expired()) - udpMux.mu.Unlock() - - // clean receiver read buffer - buf := make([]byte, receiveMTU) - _, err = remoteConn.Read(buf) - require.NoError(t, err) - - // write back to udpMux XOR message with address - msg := stun.New() - msg.Type = stun.MessageType{Method: stun.MethodBinding, Class: stun.ClassRequest} - msg.Add(stun.AttrUsername, []byte(ufrag+":otherufrag")) - addr := &stun.XORMappedAddress{ - IP: testXORIP, - Port: testXORPort, - } - addr.AddTo(msg) - msg.Encode() - _, err = remoteConn.Write(msg.Raw) - require.NoError(t, err) - - // wait for the packet to be consumed and parsed by udpMux - wg.Wait() - - // we should get address immediately from the cached map - address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) - require.NoError(t, err) - require.NotNil(t, address) - - udpMux.mu.Lock() - // check mappedAddr is not pending, we didn't send stun twice - require.False(t, mappedAddr.pending()) - - // check expiration by TTL - time.Sleep(time.Millisecond * 21) - require.True(t, mappedAddr.expired()) - udpMux.mu.Unlock() - - // after expire, we send stun request again - // but we not receive response in 5 milliseconds and should get error here - address, err = udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Millisecond*5) - require.NotNil(t, err) - require.Nil(t, address) -} - func verifyPacket(t *testing.T, b []byte, nextSeq uint32) { readSeq := binary.LittleEndian.Uint32(b[0:4]) require.Equal(t, nextSeq, readSeq) diff --git a/udp_mux_universal.go b/udp_mux_universal.go new file mode 100644 index 00000000..ea9c5e79 --- /dev/null +++ b/udp_mux_universal.go @@ -0,0 +1,262 @@ +package ice + +import ( + "fmt" + "github.com/pion/logging" + "github.com/pion/stun" + "net" + "time" +) + +// UniversalUDPMux allows multiple connections to go over a single UDP port for +// host, server reflexive and relayed candidates. +// Actual connection muxing is happening in the UDPMux. +type UniversalUDPMux interface { + UDPMux + GetXORMappedAddr(stunAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) + GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) + GetConnForURL(ufrag string, url string) (net.PacketConn, error) +} + +// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn overriding ReadFrom. +// It the passes packets to the UDPMux that does the actual connection muxing. +type UniversalUDPMuxDefault struct { + *UDPMuxDefault + params UniversalUDPMuxParams + + // since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents + // stun.XORMappedAddress indexed by the STUN server addr + xorMappedMap map[string]*xorMapped +} + +// UniversalUDPMuxParams are parameters for UniversalUDPMux server reflexive. +type UniversalUDPMuxParams struct { + Logger logging.LeveledLogger + UDPConn net.PacketConn + XORMappedAddrCacheTTL time.Duration +} + +// NewUniversalUDPMuxDefault creates an implementation of UniversalUDPMux embedding UDPMux +func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDefault { + if params.Logger == nil { + params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice") + } + if params.XORMappedAddrCacheTTL == 0 { + params.XORMappedAddrCacheTTL = time.Second * 25 + } + + m := &UniversalUDPMuxDefault{ + params: params, + xorMappedMap: make(map[string]*xorMapped), + } + + // wrap UDP connection, process server reflexive messages + // before they are passed to the UDPMux connection handler (connWorker) + m.params.UDPConn = &udpConn{ + PacketConn: params.UDPConn, + mux: m, + logger: params.Logger, + } + + // embed UDPMux + udpMuxParams := UDPMuxParams{ + Logger: params.Logger, + UDPConn: m.params.UDPConn, + } + m.UDPMuxDefault = NewUDPMuxDefault(udpMuxParams) + + return m +} + +// udpConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets +type udpConn struct { + net.PacketConn + mux *UniversalUDPMuxDefault + logger logging.LeveledLogger +} + +func (m *UniversalUDPMuxDefault) GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) { + return nil, fmt.Errorf("not implemented yet") +} + +// GetConnForURL add uniques to the muxed connection by concatenating ufrag and URL (e.g. STUN URL) to be able to support multiple STUN/TURN servers +// and return a unique connection per server. +func (m *UniversalUDPMuxDefault) GetConnForURL(ufrag string, url string) (net.PacketConn, error) { + return m.UDPMuxDefault.GetConn(fmt.Sprintf("%s%s", ufrag, url)) +} + +// ReadFrom is called by UDPMux connWorker and handles packets coming from the STUN server discovering a mapped address. +// It passes processed packets further to the UDPMux (maybe this is not really necessary). +func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + if err != nil { + return + } + + if stun.IsMessage(p[:n]) { + msg := &stun.Message{ + Raw: append([]byte{}, p[:n]...), + } + + if err = msg.Decode(); err != nil { + c.logger.Warnf("Failed to handle decode ICE from %s: %v\n", addr.String(), err) + return n, addr, nil + } + + udpAddr, ok := addr.(*net.UDPAddr) + if !ok { + // message about this err will be logged in the UDPMux + return + } + + if c.mux.isXORMappedResponse(msg, udpAddr.String()) { + err = c.mux.handleXORMappedResponse(udpAddr, msg) + if err != nil { + c.logger.Debugf("%w: %v", errGetXorMappedAddrResponse, err) + return n, addr, nil + } + return + } + } + return +} + +// isXORMappedResponse indicates whether the message is a XORMappedAddress and is coming from the known STUN server. +func (m *UniversalUDPMuxDefault) isXORMappedResponse(msg *stun.Message, stunAddr string) bool { + m.mu.Lock() + defer m.mu.Unlock() + // check first if it is a STUN server address because remote peer can also send similar messages but as a BindingSuccess + _, ok := m.xorMappedMap[stunAddr] + _, err := msg.Get(stun.AttrXORMappedAddress) + return err == nil && ok +} + +// handleXORMappedResponse parses response from the STUN server, extracts XORMappedAddress attribute +// and set the mapped address for the server +func (m *UniversalUDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun.Message) error { + m.mu.Lock() + defer m.mu.Unlock() + + mappedAddr, ok := m.xorMappedMap[stunAddr.String()] + if !ok { + return fmt.Errorf("no address map for %s", stunAddr.String()) + } + + var addr stun.XORMappedAddress + if err := addr.GetFrom(msg); err != nil { + return err + } + + m.xorMappedMap[stunAddr.String()] = mappedAddr + mappedAddr.SetAddr(&addr) + + return nil +} + +// GetXORMappedAddr returns *stun.XORMappedAddress if already present for a given STUN server. +// Makes a STUN binding request to discover mapped address otherwise. +// Blocks until the stun.XORMappedAddress has been discovered or deadline. +// Method is safe for concurrent use. +func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) { + m.mu.Lock() + mappedAddr, ok := m.xorMappedMap[serverAddr.String()] + // if we already have a mapping for this STUN server (address already recieved) + // and if it is not too old we return it without making a new request to STUN server + if ok { + if mappedAddr.expired() { + mappedAddr.closeWaiters() + delete(m.xorMappedMap, serverAddr.String()) + ok = false + } else if mappedAddr.pending() { + ok = false + } + } + m.mu.Unlock() + if ok { + return mappedAddr.addr, nil + } + + // otherwise, make a STUN request to discover the address + // or wait for already sent request to complete + waitAddrReceived, err := m.sendStun(serverAddr) + if err != nil { + return nil, fmt.Errorf("could not send STUN request: %v", err) + } + + // block until response was handled by the connWorker routine and XORMappedAddress was updated + select { + case <-waitAddrReceived: + // when channel closed, addr was obtained + m.mu.Lock() + mappedAddr := *m.xorMappedMap[serverAddr.String()] + m.mu.Unlock() + if mappedAddr.addr == nil { + return nil, fmt.Errorf("no XORMappedAddress for %s", serverAddr.String()) + } + return mappedAddr.addr, nil + case <-time.After(deadline): + return nil, fmt.Errorf("timeout while waiting for XORMappedAddr") + } +} + +// sendStun sends a STUN request via UDP conn. +// +// The returned channel is closed when the STUN response has been received. +// Method is safe for concurrent use. +func (m *UniversalUDPMuxDefault) sendStun(serverAddr net.Addr) (chan struct{}, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // if record present in the map, we already sent a STUN request, + // just wait when waitAddrRecieved will be closed + addrMap, ok := m.xorMappedMap[serverAddr.String()] + if !ok { + addrMap = &xorMapped{ + expiresAt: time.Now().Add(m.params.XORMappedAddrCacheTTL), + waitAddrReceived: make(chan struct{}), + } + m.xorMappedMap[serverAddr.String()] = addrMap + } + + req, err := stun.Build(stun.BindingRequest, stun.TransactionID) + if err != nil { + return nil, err + } + + if _, err = m.params.UDPConn.WriteTo(req.Raw, serverAddr); err != nil { + return nil, err + } + + return addrMap.waitAddrReceived, nil +} + +type xorMapped struct { + addr *stun.XORMappedAddress + waitAddrReceived chan struct{} + expiresAt time.Time +} + +func (a *xorMapped) closeWaiters() { + select { + case <-a.waitAddrReceived: + // notify was close, ok, that means we received duplicate response + // just exit + break + default: + // notify tha twe have a new addr + close(a.waitAddrReceived) + } +} + +func (a *xorMapped) pending() bool { + return a.addr == nil +} + +func (a *xorMapped) expired() bool { + return a.expiresAt.Before(time.Now()) +} + +func (a *xorMapped) SetAddr(addr *stun.XORMappedAddress) { + a.addr = addr + a.closeWaiters() +} diff --git a/udp_mux_universal_test.go b/udp_mux_universal_test.go new file mode 100644 index 00000000..4c00b298 --- /dev/null +++ b/udp_mux_universal_test.go @@ -0,0 +1,123 @@ +package ice + +import ( + "github.com/pion/stun" + "github.com/stretchr/testify/require" + "net" + "sync" + "testing" + "time" +) + +func TestUniversalUDPMux(t *testing.T) { + conn, err := net.ListenUDP(udp, &net.UDPAddr{}) + require.NoError(t, err) + + udpMux := NewUniversalUDPMuxDefault(UniversalUDPMuxParams{ + Logger: nil, + UDPConn: conn, + }) + + require.NoError(t, err) + defer func() { + _ = udpMux.Close() + _ = conn.Close() + }() + + require.NotNil(t, udpMux.LocalAddr(), "tcpMux.LocalAddr() is nil") + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + testMuxSrflxConnection(t, udpMux, "ufrag4", udp) + }() + + wg.Wait() + +} + +func testMuxSrflxConnection(t *testing.T, udpMux *UniversalUDPMuxDefault, ufrag string, network string) { + pktConn, err := udpMux.GetConn(ufrag) + require.NoError(t, err, "error retrieving muxed connection for ufrag") + defer func() { + _ = pktConn.Close() + }() + + remoteConn, err := net.DialUDP(network, nil, &net.UDPAddr{ + Port: udpMux.LocalAddr().(*net.UDPAddr).Port, + }) + require.NoError(t, err, "error dialing test udp connection") + defer func() { + _ = remoteConn.Close() + }() + + // use small value for TTL to check expiration of the address + udpMux.params.XORMappedAddrCacheTTL = time.Millisecond * 20 + testXORIP := net.ParseIP("213.141.156.236") + testXORPort := 21254 + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) + require.NoError(t, err) + require.NotNil(t, address) + require.True(t, address.IP.Equal(testXORIP)) + require.Equal(t, address.Port, testXORPort) + }() + + // wait until GetXORMappedAddr calls sendStun method + time.Sleep(time.Millisecond) + + // check that mapped address filled correctly after sent stun + udpMux.mu.Lock() + mappedAddr, ok := udpMux.xorMappedMap[remoteConn.LocalAddr().String()] + require.True(t, ok) + require.NotNil(t, mappedAddr) + require.True(t, mappedAddr.pending()) + require.False(t, mappedAddr.expired()) + udpMux.mu.Unlock() + + // clean receiver read buffer + buf := make([]byte, receiveMTU) + _, err = remoteConn.Read(buf) + require.NoError(t, err) + + // write back to udpMux XOR message with address + msg := stun.New() + msg.Type = stun.MessageType{Method: stun.MethodBinding, Class: stun.ClassRequest} + msg.Add(stun.AttrUsername, []byte(ufrag+":otherufrag")) + addr := &stun.XORMappedAddress{ + IP: testXORIP, + Port: testXORPort, + } + addr.AddTo(msg) + msg.Encode() + _, err = remoteConn.Write(msg.Raw) + require.NoError(t, err) + + // wait for the packet to be consumed and parsed by udpMux + wg.Wait() + + // we should get address immediately from the cached map + address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) + require.NoError(t, err) + require.NotNil(t, address) + + udpMux.mu.Lock() + // check mappedAddr is not pending, we didn't send stun twice + require.False(t, mappedAddr.pending()) + + // check expiration by TTL + time.Sleep(time.Millisecond * 21) + require.True(t, mappedAddr.expired()) + udpMux.mu.Unlock() + + // after expire, we send stun request again + // but we not receive response in 5 milliseconds and should get error here + address, err = udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Millisecond*5) + require.NotNil(t, err) + require.Nil(t, address) +} From c47b7d96ed6ad95cd125f2da91c516de32315567 Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 17 Feb 2022 14:15:36 +0100 Subject: [PATCH 4/8] Test sanity checks --- .github/workflows/test.yaml | 4 ++-- .github/workflows/tidy-check.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 43608f13..2b6f3552 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -13,10 +13,10 @@ name: Test on: push: branches: - - master + - * pull_request: branches: - - master + - * jobs: test: runs-on: ubuntu-latest diff --git a/.github/workflows/tidy-check.yaml b/.github/workflows/tidy-check.yaml index 03b5189d..470e5abb 100644 --- a/.github/workflows/tidy-check.yaml +++ b/.github/workflows/tidy-check.yaml @@ -13,10 +13,10 @@ name: Go mod tidy on: pull_request: branches: - - master + - * push: branches: - - master + - * jobs: Check: From 6da5ad620afbd38073a0ee21296af19a11f70b90 Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 17 Feb 2022 14:42:13 +0100 Subject: [PATCH 5/8] Fix lint errors This commit fixes lint issues detected by github actions --- gather.go | 3 --- udp_mux_universal_test.go | 7 ++++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/gather.go b/gather.go index 8bb1f96f..8d2c40db 100644 --- a/gather.go +++ b/gather.go @@ -347,9 +347,6 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, ne } for i := range urls { - if !(urls[i].Scheme == SchemeTypeSTUN || urls[i].Scheme == SchemeTypeSTUNS) { - continue - } wg.Add(1) go func(url URL, network string) { defer wg.Done() diff --git a/udp_mux_universal_test.go b/udp_mux_universal_test.go index 4c00b298..caafa065 100644 --- a/udp_mux_universal_test.go +++ b/udp_mux_universal_test.go @@ -1,3 +1,6 @@ +//go:build !js +// +build !js + package ice import ( @@ -93,7 +96,9 @@ func testMuxSrflxConnection(t *testing.T, udpMux *UniversalUDPMuxDefault, ufrag IP: testXORIP, Port: testXORPort, } - addr.AddTo(msg) + err = addr.AddTo(msg) + require.NoError(t, err) + msg.Encode() _, err = remoteConn.Write(msg.Raw) require.NoError(t, err) From d11e90c206b60ab42515ce63587fd0952c767c6e Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 17 Feb 2022 15:10:34 +0100 Subject: [PATCH 6/8] Fix lint issues\ This commit fixes lint issues detected by github actions --- udp_mux_universal.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/udp_mux_universal.go b/udp_mux_universal.go index ea9c5e79..1c3178b2 100644 --- a/udp_mux_universal.go +++ b/udp_mux_universal.go @@ -75,6 +75,8 @@ type udpConn struct { logger logging.LeveledLogger } +// GetRelayedAddr creates relayed connection to the given TURN service and returns the relayed addr. +// Not implemented yet. func (m *UniversalUDPMuxDefault) GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) { return nil, fmt.Errorf("not implemented yet") } From 5cc5e309e574746c5e1cd32de863dd1e20eb992d Mon Sep 17 00:00:00 2001 From: braginini Date: Thu, 17 Feb 2022 19:34:46 +0100 Subject: [PATCH 7/8] Fix lint errors This commit fixes lint issues related to the errors --- errors.go | 4 ++++ udp_mux_universal.go | 16 ++++++++-------- udp_mux_universal_test.go | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/errors.go b/errors.go index 8ca9c2cd..2ef595e5 100644 --- a/errors.go +++ b/errors.go @@ -135,4 +135,8 @@ var ( errICEWriteSTUNMessage = errors.New("the ICE conn can't write STUN messages") errUDPMuxDisabled = errors.New("UDPMux is not enabled") errCandidateIPNotFound = errors.New("could not determine local IP for Mux candidate") + errNoXorAddrMapping = errors.New("no address mapping") + errSendSTUNPacket = errors.New("failed to send STUN packet") + errXORMappedAddrTimeout = errors.New("timeout while waiting for XORMappedAddr") + errNotImplemented = errors.New("not implemented yet") ) diff --git a/udp_mux_universal.go b/udp_mux_universal.go index 1c3178b2..29653b0d 100644 --- a/udp_mux_universal.go +++ b/udp_mux_universal.go @@ -78,7 +78,7 @@ type udpConn struct { // GetRelayedAddr creates relayed connection to the given TURN service and returns the relayed addr. // Not implemented yet. func (m *UniversalUDPMuxDefault) GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error) { - return nil, fmt.Errorf("not implemented yet") + return nil, errNotImplemented } // GetConnForURL add uniques to the muxed connection by concatenating ufrag and URL (e.g. STUN URL) to be able to support multiple STUN/TURN servers @@ -120,7 +120,7 @@ func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { return } } - return + return n, addr, err } // isXORMappedResponse indicates whether the message is a XORMappedAddress and is coming from the known STUN server. @@ -141,7 +141,7 @@ func (m *UniversalUDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, mappedAddr, ok := m.xorMappedMap[stunAddr.String()] if !ok { - return fmt.Errorf("no address map for %s", stunAddr.String()) + return errNoXorAddrMapping } var addr stun.XORMappedAddress @@ -162,7 +162,7 @@ func (m *UniversalUDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) { m.mu.Lock() mappedAddr, ok := m.xorMappedMap[serverAddr.String()] - // if we already have a mapping for this STUN server (address already recieved) + // if we already have a mapping for this STUN server (address already received) // and if it is not too old we return it without making a new request to STUN server if ok { if mappedAddr.expired() { @@ -182,7 +182,7 @@ func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline // or wait for already sent request to complete waitAddrReceived, err := m.sendStun(serverAddr) if err != nil { - return nil, fmt.Errorf("could not send STUN request: %v", err) + return nil, errSendSTUNPacket } // block until response was handled by the connWorker routine and XORMappedAddress was updated @@ -193,11 +193,11 @@ func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline mappedAddr := *m.xorMappedMap[serverAddr.String()] m.mu.Unlock() if mappedAddr.addr == nil { - return nil, fmt.Errorf("no XORMappedAddress for %s", serverAddr.String()) + return nil, errNoXorAddrMapping } return mappedAddr.addr, nil case <-time.After(deadline): - return nil, fmt.Errorf("timeout while waiting for XORMappedAddr") + return nil, errXORMappedAddrTimeout } } @@ -210,7 +210,7 @@ func (m *UniversalUDPMuxDefault) sendStun(serverAddr net.Addr) (chan struct{}, e defer m.mu.Unlock() // if record present in the map, we already sent a STUN request, - // just wait when waitAddrRecieved will be closed + // just wait when waitAddrReceived will be closed addrMap, ok := m.xorMappedMap[serverAddr.String()] if !ok { addrMap = &xorMapped{ diff --git a/udp_mux_universal_test.go b/udp_mux_universal_test.go index caafa065..00930359 100644 --- a/udp_mux_universal_test.go +++ b/udp_mux_universal_test.go @@ -64,8 +64,8 @@ func testMuxSrflxConnection(t *testing.T, udpMux *UniversalUDPMuxDefault, ufrag wg.Add(1) go func() { defer wg.Done() - address, err := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) - require.NoError(t, err) + address, e := udpMux.GetXORMappedAddr(remoteConn.LocalAddr(), time.Second) + require.NoError(t, e) require.NotNil(t, address) require.True(t, address.IP.Equal(testXORIP)) require.Equal(t, address.Port, testXORPort) From d2003e3570a6a924e45407337426e701e9ad5083 Mon Sep 17 00:00:00 2001 From: braginini Date: Fri, 18 Feb 2022 08:54:07 +0100 Subject: [PATCH 8/8] My head line My content line. --- .github/lint-commit-message.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/lint-commit-message.sh b/.github/lint-commit-message.sh index 010a3328..d1194645 100755 --- a/.github/lint-commit-message.sh +++ b/.github/lint-commit-message.sh @@ -29,6 +29,7 @@ EndOfMessage exit 1 } + lint_commit_message() { if [[ "$(echo "$1" | awk 'NR == 2 {print $1;}' | wc -c)" -ne 1 ]]; then display_commit_message_error "$1" 'Separate subject from body with a blank line'