Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -29,7 +30,7 @@ var log = logging.Logger("p2p-config")
type AddrsFactory = bhost.AddrsFactory

// NATManagerC is a NATManager constructor.
type NATManagerC func(inet.Network) bhost.NATManager
type NATManagerC func(inet.Network, *identify.IDService) bhost.NATManager

// Config describes a set of settings for a libp2p node
//
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type HostOpts struct {

// NATManager takes care of setting NAT port mappings, and discovering external addresses.
// If omitted, this will simply be disabled.
NATManager func(inet.Network) NATManager
NATManager func(inet.Network, *identify.IDService) NATManager

// ConnManager is a libp2p connection manager
ConnManager ifconnmgr.ConnManager
Expand Down Expand Up @@ -136,7 +136,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
}

if opts.NATManager != nil {
h.natmgr = opts.NATManager(net)
h.natmgr = opts.NATManager(net, h.ids)
}

if opts.MultiaddrResolver != nil {
Expand Down
196 changes: 181 additions & 15 deletions p2p/host/basic/natmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package basichost

import (
"context"
"strconv"
"sync"
"time"

goprocess "github.com/jbenet/goprocess"
lgbl "github.com/libp2p/go-libp2p-loggables"
inat "github.com/libp2p/go-libp2p-nat"
inet "github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -25,8 +29,8 @@ type NATManager interface {
}

// Create a NAT manager.
func NewNATManager(net inet.Network) NATManager {
return newNatManager(net)
func NewNATManager(net inet.Network, ids *identify.IDService) NATManager {
return newNatManager(net, ids)
}

// natManager takes care of adding + removing port mappings to the nat.
Expand All @@ -36,18 +40,20 @@ func NewNATManager(net inet.Network) NATManager {
// as the network signals Listen() or ListenClose().
// * closing the natManager closes the nat and its mappings.
type natManager struct {
net inet.Network
natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.)
nat *inat.NAT
net inet.Network
idService *identify.IDService
natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.)
nat *inat.NAT

ready chan struct{} // closed once the nat is ready to process port mappings
proc goprocess.Process // natManager has a process + children. can be closed.
}

func newNatManager(net inet.Network) *natManager {
func newNatManager(net inet.Network, ids *identify.IDService) *natManager {
nmgr := &natManager{
net: net,
ready: make(chan struct{}),
net: net,
idService: ids,
ready: make(chan struct{}),
}

nmgr.proc = goprocess.WithTeardown(func() error {
Expand All @@ -58,6 +64,8 @@ func newNatManager(net inet.Network) *natManager {

// discover the nat.
nmgr.discoverNAT()
// sync observed addresses with NAT port mapping configs
nmgr.syncObservedAddresses()
return nmgr
}

Expand Down Expand Up @@ -129,7 +137,163 @@ func (nmgr *natManager) discoverNAT() {
addrs := nmgr.net.ListenAddresses()
for _, addr := range addrs {
// we do it async because it's slow and we may want to close beforehand
go addPortMapping(nmgr, addr)
go addPortMapping(nmgr, addr, inat.RandomMappingExternalPort)
}
})
}

// parseAddress extracts IP address and port from a Multiaddr
func parseAddress(a ma.Multiaddr) (ip string, port int, err error) {
ip, err = a.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
ip, err = a.ValueForProtocol(multiaddr.P_IP6)
if err != nil { // don't handle circuit-relay and other addresses
return
}
}
portString, err := a.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return
}
port, err = strconv.Atoi(portString)
return ip, port, err
}

func parseLocalIPs(listenAddresses []ma.Multiaddr) (localIPs []string) {
for _, la := range listenAddresses {
ip, _, err := parseAddress(la)
if err == nil {
localIPs = append(localIPs, ip)
}
}
return localIPs
}

func parseExtraPorts(observed []ma.Multiaddr, mapped []inat.Mapping,
localIPs []string) (extraPorts []int) {

alreadyMapped := make(map[string]bool)
for _, m := range mapped {
externalAddr, err := m.ExternalAddr()
if err != nil {
continue
}
alreadyMapped[externalAddr.String()] = true
}

isLocal := make(map[string]bool)
for _, ip := range localIPs {
isLocal[ip] = true
}

for _, a := range observed {
if alreadyMapped[a.String()] {
continue
}
ip, port, err := parseAddress(a)
if err != nil {
continue
}
if isLocal[ip] {
continue
}
extraPorts = append(extraPorts, port)
}

return extraPorts
}

func parseExcessivePorts(observed []ma.Multiaddr, mapped []inat.Mapping) (excessiveMaps []inat.Mapping) {
observedMap := make(map[string]bool)
for _, a := range observed {
observedMap[a.String()] = true
}
for _, m := range mapped {
externalAddr, err := m.ExternalAddr()
if err != nil {
continue
}
if !observedMap[externalAddr.String()] {
excessiveMaps = append(excessiveMaps, m)
}
}
return excessiveMaps
}

const closeMappingThreshold = 10

func (nmgr *natManager) syncObservedAddresses() {
nmgr.proc.Go(func(worker goprocess.Process) {
// wait until NAT is ready, or natManager quits
select {
case <-worker.Closing():
return
case <-nmgr.ready:
}

nmgr.natmu.Lock()
if nmgr.nat == nil {
nmgr.natmu.Unlock()
return
}
natInstance := nmgr.nat
worker.AddChild(natInstance.Process())
nmgr.natmu.Unlock()

// `mappingNotSeenCount` keeps how many times a mapping is configured, but not observed.
// If a mapping is not observed for consecutive `closeMappingThreshold` times, close it.
mappingNotSeenCount := make(map[string]int)

ticker := time.NewTicker(inat.MappingDuration)
for {
select {
case <-worker.Closing():
ticker.Stop()
return
case <-ticker.C:
}

listened, listenedErr := nmgr.net.InterfaceListenAddresses()
if listenedErr != nil {
log.Infof("Failed to get interface listen addresses: %v", listenedErr)
continue
}
// get local IP addresses
localIPs := parseLocalIPs(listened)
// get NAT mappings we already configured
mapped := nmgr.nat.Mappings()
// get observed addresses from peers
observed := nmgr.idService.OwnObservedAddrs()
// portsToMap are those external ports observed by peers, but not configured
portsToMap := parseExtraPorts(observed, mapped, localIPs)
// mappingsToClose are ports configured, but not observed by peers
mappingsToClose := parseExcessivePorts(observed, mapped)

log.Debugf("observed %+v, listened %+v, listenedErr %+v, portsToMap %+v\n",
observed, listened, listenedErr, portsToMap)

// remove observed addresses from `mappingNotSeenCount` to prevent them from being deleted
for _, addr := range observed {
delete(mappingNotSeenCount, addr.String())
}
// maintain `mappingNotSeenCount`, close mappings not observed more than `closeMappingThreshold` times
for _, m := range mappingsToClose {
externalAddr, err := m.ExternalAddr()
if err != nil {
continue
}
mappingNotSeenCount[externalAddr.String()] += 1
if mappingNotSeenCount[externalAddr.String()] > closeMappingThreshold {
m.Close()
delete(mappingNotSeenCount, externalAddr.String())
}
}
// add port mappings for `portsToMap`
for _, port := range portsToMap {
for _, listenAddr := range nmgr.net.ListenAddresses() {
addPortMapping(nmgr, listenAddr, port)
}
}
}
})
}
Expand All @@ -143,16 +307,18 @@ func (nmgr *natManager) NAT() *inat.NAT {
return nmgr.nat
}

func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) {
func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr, externalPort int) {
nat := nmgr.NAT()
if nat == nil {
panic("natManager addPortMapping called without a nat.")
}

// first, check if the port mapping already exists.
for _, mapping := range nat.Mappings() {
if mapping.InternalAddr().Equal(intaddr) {
return // it exists! return.
if externalPort == inat.RandomMappingExternalPort {
for _, mapping := range nat.Mappings() {
if mapping.InternalAddr().Equal(intaddr) {
return // it exists! return.
}
}
}

Expand All @@ -173,7 +339,7 @@ func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) {
defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done()

// get the nat
m, err := nat.NewMapping(intaddr)
m, err := nat.NewMapping(intaddr, externalPort)
if err != nil {
lm["outcome"] = "failure"
lm["error"] = err
Expand Down Expand Up @@ -223,7 +389,7 @@ func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) {
return // not ready or doesnt exist.
}

addPortMapping(nn.natManager(), addr)
addPortMapping(nn.natManager(), addr, inat.RandomMappingExternalPort)
}

func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
lmaddrs = append(lmaddrs, maddr)
}

// if the address reported by the connection roughly matches their annoucned
// if the address reported by the connection roughly matches their announced
// listener addresses, its likely to be an external NAT address
if HasConsistentTransport(c.RemoteMultiaddr(), lmaddrs) {
lmaddrs = append(lmaddrs, c.RemoteMultiaddr())
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const ActivationThresh = 4
// We only use addresses that:
// - have been observed at least 4 times in last 1h. (counter symmetric nats)
// - have been observed at least once recently (1h), because our position in the
// network, or network port mapppings, may have changed.
// network, or network port mappings, may have changed.
type ObservedAddr struct {
Addr ma.Multiaddr
SeenBy map[string]time.Time
Expand Down