diff --git a/config/config.go b/config/config.go index b017fea60e..40f2646563 100644 --- a/config/config.go +++ b/config/config.go @@ -18,6 +18,7 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" swarm "github.com/libp2p/go-libp2p-swarm" tptu "github.com/libp2p/go-libp2p-transport-upgrader" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" filter "github.com/libp2p/go-maddr-filter" ma "github.com/multiformats/go-multiaddr" ) @@ -29,7 +30,7 @@ var log = logging.Logger("p2p-config") type AddrsFactory = bhost.AddrsFactory // NATManagerC is a NATManager constructor. -type NATManagerC func(inet.Network) bhost.NATManager +type NATManagerC func(inet.Network, *identify.IDService) bhost.NATManager // Config describes a set of settings for a libp2p node // diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 2deb016f1f..87fa51ea43 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -93,7 +93,7 @@ type HostOpts struct { // NATManager takes care of setting NAT port mappings, and discovering external addresses. // If omitted, this will simply be disabled. - NATManager func(inet.Network) NATManager + NATManager func(inet.Network, *identify.IDService) NATManager // ConnManager is a libp2p connection manager ConnManager ifconnmgr.ConnManager @@ -136,7 +136,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, } if opts.NATManager != nil { - h.natmgr = opts.NATManager(net) + h.natmgr = opts.NATManager(net, h.ids) } if opts.MultiaddrResolver != nil { diff --git a/p2p/host/basic/natmgr.go b/p2p/host/basic/natmgr.go index 349f79e789..f6e468631e 100644 --- a/p2p/host/basic/natmgr.go +++ b/p2p/host/basic/natmgr.go @@ -2,12 +2,16 @@ package basichost import ( "context" + "strconv" "sync" + "time" goprocess "github.com/jbenet/goprocess" lgbl "github.com/libp2p/go-libp2p-loggables" inat "github.com/libp2p/go-libp2p-nat" inet "github.com/libp2p/go-libp2p-net" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" ) @@ -25,8 +29,8 @@ type NATManager interface { } // Create a NAT manager. -func NewNATManager(net inet.Network) NATManager { - return newNatManager(net) +func NewNATManager(net inet.Network, ids *identify.IDService) NATManager { + return newNatManager(net, ids) } // natManager takes care of adding + removing port mappings to the nat. @@ -36,18 +40,20 @@ func NewNATManager(net inet.Network) NATManager { // as the network signals Listen() or ListenClose(). // * closing the natManager closes the nat and its mappings. type natManager struct { - net inet.Network - natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) - nat *inat.NAT + net inet.Network + idService *identify.IDService + natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) + nat *inat.NAT ready chan struct{} // closed once the nat is ready to process port mappings proc goprocess.Process // natManager has a process + children. can be closed. } -func newNatManager(net inet.Network) *natManager { +func newNatManager(net inet.Network, ids *identify.IDService) *natManager { nmgr := &natManager{ - net: net, - ready: make(chan struct{}), + net: net, + idService: ids, + ready: make(chan struct{}), } nmgr.proc = goprocess.WithTeardown(func() error { @@ -58,6 +64,8 @@ func newNatManager(net inet.Network) *natManager { // discover the nat. nmgr.discoverNAT() + // sync observed addresses with NAT port mapping configs + nmgr.syncObservedAddresses() return nmgr } @@ -129,7 +137,163 @@ func (nmgr *natManager) discoverNAT() { addrs := nmgr.net.ListenAddresses() for _, addr := range addrs { // we do it async because it's slow and we may want to close beforehand - go addPortMapping(nmgr, addr) + go addPortMapping(nmgr, addr, inat.RandomMappingExternalPort) + } + }) +} + +// parseAddress extracts IP address and port from a Multiaddr +func parseAddress(a ma.Multiaddr) (ip string, port int, err error) { + ip, err = a.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + ip, err = a.ValueForProtocol(multiaddr.P_IP6) + if err != nil { // don't handle circuit-relay and other addresses + return + } + } + portString, err := a.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return + } + port, err = strconv.Atoi(portString) + return ip, port, err +} + +func parseLocalIPs(listenAddresses []ma.Multiaddr) (localIPs []string) { + for _, la := range listenAddresses { + ip, _, err := parseAddress(la) + if err == nil { + localIPs = append(localIPs, ip) + } + } + return localIPs +} + +func parseExtraPorts(observed []ma.Multiaddr, mapped []inat.Mapping, + localIPs []string) (extraPorts []int) { + + alreadyMapped := make(map[string]bool) + for _, m := range mapped { + externalAddr, err := m.ExternalAddr() + if err != nil { + continue + } + alreadyMapped[externalAddr.String()] = true + } + + isLocal := make(map[string]bool) + for _, ip := range localIPs { + isLocal[ip] = true + } + + for _, a := range observed { + if alreadyMapped[a.String()] { + continue + } + ip, port, err := parseAddress(a) + if err != nil { + continue + } + if isLocal[ip] { + continue + } + extraPorts = append(extraPorts, port) + } + + return extraPorts +} + +func parseExcessivePorts(observed []ma.Multiaddr, mapped []inat.Mapping) (excessiveMaps []inat.Mapping) { + observedMap := make(map[string]bool) + for _, a := range observed { + observedMap[a.String()] = true + } + for _, m := range mapped { + externalAddr, err := m.ExternalAddr() + if err != nil { + continue + } + if !observedMap[externalAddr.String()] { + excessiveMaps = append(excessiveMaps, m) + } + } + return excessiveMaps +} + +const closeMappingThreshold = 10 + +func (nmgr *natManager) syncObservedAddresses() { + nmgr.proc.Go(func(worker goprocess.Process) { + // wait until NAT is ready, or natManager quits + select { + case <-worker.Closing(): + return + case <-nmgr.ready: + } + + nmgr.natmu.Lock() + if nmgr.nat == nil { + nmgr.natmu.Unlock() + return + } + natInstance := nmgr.nat + worker.AddChild(natInstance.Process()) + nmgr.natmu.Unlock() + + // `mappingNotSeenCount` keeps how many times a mapping is configured, but not observed. + // If a mapping is not observed for consecutive `closeMappingThreshold` times, close it. + mappingNotSeenCount := make(map[string]int) + + ticker := time.NewTicker(inat.MappingDuration) + for { + select { + case <-worker.Closing(): + ticker.Stop() + return + case <-ticker.C: + } + + listened, listenedErr := nmgr.net.InterfaceListenAddresses() + if listenedErr != nil { + log.Infof("Failed to get interface listen addresses: %v", listenedErr) + continue + } + // get local IP addresses + localIPs := parseLocalIPs(listened) + // get NAT mappings we already configured + mapped := nmgr.nat.Mappings() + // get observed addresses from peers + observed := nmgr.idService.OwnObservedAddrs() + // portsToMap are those external ports observed by peers, but not configured + portsToMap := parseExtraPorts(observed, mapped, localIPs) + // mappingsToClose are ports configured, but not observed by peers + mappingsToClose := parseExcessivePorts(observed, mapped) + + log.Debugf("observed %+v, listened %+v, listenedErr %+v, portsToMap %+v\n", + observed, listened, listenedErr, portsToMap) + + // remove observed addresses from `mappingNotSeenCount` to prevent them from being deleted + for _, addr := range observed { + delete(mappingNotSeenCount, addr.String()) + } + // maintain `mappingNotSeenCount`, close mappings not observed more than `closeMappingThreshold` times + for _, m := range mappingsToClose { + externalAddr, err := m.ExternalAddr() + if err != nil { + continue + } + mappingNotSeenCount[externalAddr.String()] += 1 + if mappingNotSeenCount[externalAddr.String()] > closeMappingThreshold { + m.Close() + delete(mappingNotSeenCount, externalAddr.String()) + } + } + // add port mappings for `portsToMap` + for _, port := range portsToMap { + for _, listenAddr := range nmgr.net.ListenAddresses() { + addPortMapping(nmgr, listenAddr, port) + } + } } }) } @@ -143,16 +307,18 @@ func (nmgr *natManager) NAT() *inat.NAT { return nmgr.nat } -func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { +func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr, externalPort int) { nat := nmgr.NAT() if nat == nil { panic("natManager addPortMapping called without a nat.") } // first, check if the port mapping already exists. - for _, mapping := range nat.Mappings() { - if mapping.InternalAddr().Equal(intaddr) { - return // it exists! return. + if externalPort == inat.RandomMappingExternalPort { + for _, mapping := range nat.Mappings() { + if mapping.InternalAddr().Equal(intaddr) { + return // it exists! return. + } } } @@ -173,7 +339,7 @@ func addPortMapping(nmgr *natManager, intaddr ma.Multiaddr) { defer log.EventBegin(ctx, "natMgrAddPortMapping", lm).Done() // get the nat - m, err := nat.NewMapping(intaddr) + m, err := nat.NewMapping(intaddr, externalPort) if err != nil { lm["outcome"] = "failure" lm["error"] = err @@ -223,7 +389,7 @@ func (nn *nmgrNetNotifiee) Listen(n inet.Network, addr ma.Multiaddr) { return // not ready or doesnt exist. } - addPortMapping(nn.natManager(), addr) + addPortMapping(nn.natManager(), addr, inat.RandomMappingExternalPort) } func (nn *nmgrNetNotifiee) ListenClose(n inet.Network, addr ma.Multiaddr) { diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index fb949851e0..0da6984628 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -205,7 +205,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { lmaddrs = append(lmaddrs, maddr) } - // if the address reported by the connection roughly matches their annoucned + // if the address reported by the connection roughly matches their announced // listener addresses, its likely to be an external NAT address if HasConsistentTransport(c.RemoteMultiaddr(), lmaddrs) { lmaddrs = append(lmaddrs, c.RemoteMultiaddr()) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 0e72ab3433..8b471c0ab8 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -14,7 +14,7 @@ const ActivationThresh = 4 // We only use addresses that: // - have been observed at least 4 times in last 1h. (counter symmetric nats) // - have been observed at least once recently (1h), because our position in the -// network, or network port mapppings, may have changed. +// network, or network port mappings, may have changed. type ObservedAddr struct { Addr ma.Multiaddr SeenBy map[string]time.Time