Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/lint-commit-message.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ EndOfMessage
exit 1
}


lint_commit_message() {
if [[ "$(echo "$1" | awk 'NR == 2 {print $1;}' | wc -c)" -ne 1 ]]; then
display_commit_message_error "$1" 'Separate subject from body with a blank line'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ name: Test
on:
push:
branches:
- master
- *
pull_request:
branches:
- master
- *
jobs:
test:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tidy-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ name: Go mod tidy
on:
pull_request:
branches:
- master
- *
push:
branches:
- master
- *

jobs:
Check:
Expand Down
11 changes: 8 additions & 3 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ type Agent struct {
loggerFactory logging.LoggerFactory
log logging.LeveledLogger

net *vnet.Net
tcpMux TCPMux
udpMux UDPMux
net *vnet.Net
tcpMux TCPMux
udpMux UDPMux
udpMuxSrflx UniversalUDPMux

interfaceFilter func(string) bool

Expand Down Expand Up @@ -319,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
a.tcpMux = newInvalidTCPMux()
}
a.udpMux = config.UDPMux
a.udpMuxSrflx = config.UDPMuxSrflx

if a.net == nil {
a.net = vnet.NewNet(nil)
Expand Down Expand Up @@ -892,6 +894,9 @@ func (a *Agent) removeUfragFromMux() {
if a.udpMux != nil {
a.udpMux.RemoveConnByUfrag(a.localUfrag)
}
if a.udpMuxSrflx != nil {
a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
}
}

// Close cleans up the Agent
Expand Down
5 changes: 5 additions & 0 deletions agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ type AgentConfig struct {
// defer to UDPMux for incoming connections
UDPMux UDPMux

// UDPMux is used for multiplexing multiple incoming UDP connections on a single port
// when this is set, the agent ignores PortMin and PortMax configurations and will
// defer to UDPMux for incoming connections
UDPMuxSrflx UniversalUDPMux

// Proxy Dialer is a dialer that should be implemented by the user based on golang.org/x/net/proxy
// dial interface in order to support corporate proxies
ProxyDialer proxy.Dialer
Expand Down
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ var (
errICEWriteSTUNMessage = errors.New("the ICE conn can't write STUN messages")
errUDPMuxDisabled = errors.New("UDPMux is not enabled")
errCandidateIPNotFound = errors.New("could not determine local IP for Mux candidate")
errNoXorAddrMapping = errors.New("no address mapping")
errSendSTUNPacket = errors.New("failed to send STUN packet")
errXORMappedAddrTimeout = errors.New("timeout while waiting for XORMappedAddr")
errNotImplemented = errors.New("not implemented yet")
)
68 changes: 67 additions & 1 deletion gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func (a *Agent) gatherCandidates(ctx context.Context) {
case CandidateTypeServerReflexive:
wg.Add(1)
go func() {
a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
if a.udpMuxSrflx != nil {
a.gatherCandidatesSrflxUDPMux(ctx, a.urls, a.networkTypes)
} else {
a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
}
wg.Done()
}()
if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive {
Expand Down Expand Up @@ -333,6 +337,68 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []
}
}

func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
var wg sync.WaitGroup
defer wg.Wait()

for _, networkType := range networkTypes {
if networkType.IsTCP() {
continue
}

for i := range urls {
wg.Add(1)
go func(url URL, network string) {
defer wg.Done()

hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
if err != nil {
a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
return
}

xoraddr, err := a.udpMuxSrflx.GetXORMappedAddr(serverAddr, stunGatherTimeout)
if err != nil {
a.log.Warnf("could not get server reflexive address %s %s: %v\n", network, url, err)
return
}

conn, err := a.udpMuxSrflx.GetConnForURL(a.localUfrag, url.String())
if err != nil {
a.log.Warnf("could not find connection in UDPMuxSrflx %s %s: %v\n", network, url, err)
return
}

ip := xoraddr.IP
port := xoraddr.Port

laddr := conn.LocalAddr().(*net.UDPAddr)
srflxConfig := CandidateServerReflexiveConfig{
Network: network,
Address: ip.String(),
Port: port,
Component: ComponentRTP,
RelAddr: laddr.IP.String(),
RelPort: laddr.Port,
}
c, err := NewCandidateServerReflexive(&srflxConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
return
}

if err := a.addCandidate(ctx, c, conn); err != nil {
if closeErr := c.close(); closeErr != nil {
a.log.Warnf("Failed to close candidate: %v", closeErr)
}
a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
}
}(*urls[i], networkType.String())
}
}
}

func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
var wg sync.WaitGroup
defer wg.Wait()
Expand Down
10 changes: 7 additions & 3 deletions udp_mux.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package ice

import (
"github.com/pion/stun"
"io"
"net"
"os"
"strings"
"sync"

"github.com/pion/logging"
"github.com/pion/stun"
)

// UDPMux allows multiple connections to go over a single UDP port
Expand Down Expand Up @@ -97,12 +97,16 @@ func (m *UDPMuxDefault) GetConn(ufrag string) (net.PacketConn, error) {
return c, nil
}

// RemoveConnByUfrag stops and removes the muxed packet connection
// RemoveConnByUfrag stops and removes the all muxed packet connections with a ufrag prefix
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
// it happens probably when we close not yet initialized agent
if ufrag == "" {
return
}
m.mu.Lock()
removedConns := make([]*udpMuxedConn, 0)
for key := range m.conns {
if key != ufrag {
if !strings.HasPrefix(key, ufrag) {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion udp_mux_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !js
// +build !js

package ice
Expand Down Expand Up @@ -59,7 +60,6 @@ func TestUDPMux(t *testing.T) {
if ptrSize != 32 {
testMuxConnection(t, udpMux, "ufrag3", "udp6")
}

wg.Wait()

require.NoError(t, udpMux.Close())
Expand Down
Loading