diff --git a/dht.go b/dht.go index 5828395..a315390 100644 --- a/dht.go +++ b/dht.go @@ -74,6 +74,8 @@ type Config struct { // If true, the node will read the routing table from disk on startup and save routing // table snapshots on disk every few minutes. Default value: true. SaveRoutingTable bool + // do not reply to any incoming queries + PassiveMode bool // How often to save the routing table to disk. Default value: 5 minutes. SavePeriod time.Duration // Maximum packets per second to be processed. Disabled if negative. Default value: 100. @@ -86,13 +88,21 @@ type Config struct { // single peer contact typically consumes 6 bytes. Default value: 256. MaxInfoHashPeers int // ClientPerMinuteLimit protects against spammy clients. Ignore their requests if exceeded - // this number of packets per minute. Default value: 50. + // this number of packets per minute. Default value: 50. (0 to disable) ClientPerMinuteLimit int // ThrottlerTrackedClients is the number of hosts the client throttler remembers. An LRU is used to // track the most interesting ones. Default value: 1000. ThrottlerTrackedClients int64 //Protocol for UDP connections, udp4= IPv4, udp6 = IPv6 UDPProto string + // Maximum get_peer requests per infoHash to prevent infinity loop in case NumTargetPeers is set + // real high. + MaxSearchQueries int + // MaxSearchQueries counter will be reset after that time + SearchCntExpire time.Duration + // If a node replies to get_peer requests for more than MaxNodeDownloads different InfoHashes it's + // spammy and we blacklist it + MaxNodeDownloads int } // Creates a *Config populated with default values. @@ -101,10 +111,11 @@ func NewConfig() *Config { Address: "", Port: 0, // Picks a random port. NumTargetPeers: 5, - DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881", + DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881,router.utorrent.com:6881,dht.aelitis.com:6881,dht.libtorrent.org:25401", MaxNodes: 500, CleanupPeriod: 15 * time.Minute, SaveRoutingTable: true, + PassiveMode: false, SavePeriod: 5 * time.Minute, RateLimit: 100, MaxInfoHashes: 2048, @@ -112,6 +123,9 @@ func NewConfig() *Config { ClientPerMinuteLimit: 50, ThrottlerTrackedClients: 1000, UDPProto: "udp4", + MaxSearchQueries: -1, + SearchCntExpire: 10 * time.Minute, + MaxNodeDownloads: -1, } } @@ -180,7 +194,7 @@ func New(config *Config) (node *DHT, err error) { node = &DHT{ config: cfg, routingTable: newRoutingTable(), - peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers), + peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers, cfg.MaxNodes, cfg.SearchCntExpire), PeersRequestResults: make(chan map[InfoHash][]string, 1), stop: make(chan bool), exploredNeighborhood: false, @@ -245,6 +259,8 @@ type ihReq struct { // is just a router that doesn't downloads torrents. func (d *DHT) PeersRequest(ih string, announce bool) { d.peersRequest <- ihReq{InfoHash(ih), announce} + // reset searchCount on new request from bt client + d.peerStore.resetSearchCount(InfoHash(ih)) log.V(2).Infof("DHT: torrent client asking more peers for %x.", ih) } @@ -550,11 +566,6 @@ func (d *DHT) helloFromPeer(addr string) { func (d *DHT) processPacket(p packetType) { log.V(5).Infof("DHT processing packet from %v", p.raddr.String()) - if !d.clientThrottle.CheckBlock(p.raddr.IP.String()) { - totalPacketsFromBlockedHosts.Add(1) - log.V(5).Infof("Node exceeded rate limiter. Dropping packet.") - return - } if p.b[0] != 'd' { // Malformed DHT packet. There are protocol extensions out // there that we don't support or understand. @@ -563,7 +574,7 @@ func (d *DHT) processPacket(p packetType) { } r, err := readResponse(p) if err != nil { - log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b)) + //log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b)) return } switch { @@ -636,6 +647,11 @@ func (d *DHT) processPacket(p packetType) { log.V(3).Infof("DHT: Unknown query id: %v", r.T) } case r.Y == "q": + if d.config.ClientPerMinuteLimit > 0 && !d.clientThrottle.CheckBlock(p.raddr.IP.String()) { + totalPacketsFromBlockedHosts.Add(1) + log.V(5).Infof("Node exceeded rate limiter. Dropping packet.") + return + } if r.A.Id == d.nodeId { log.V(3).Infof("DHT received packet from self, id %x", r.A.Id) return @@ -651,6 +667,10 @@ func (d *DHT) processPacket(p packetType) { d.ping(addr) } } + // don't reply to any queries if in passiveMode + if d.config.PassiveMode { + return + } log.V(5).Infof("DHT processing %v request", r.Q) switch r.Q { case "ping": @@ -692,6 +712,11 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { if r == nil { return } + // if MaxSearchQueries is set and reached, don't send new get_peers + cnt := d.peerStore.addSearchCount(ih) + if d.config.MaxSearchQueries > 0 && cnt > d.config.MaxSearchQueries { + return + } totalSentGetPeers.Add(1) ty := "get_peers" transId := r.newQuery(ty) @@ -926,10 +951,22 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) { peers = append(peers, peerContact) } if len(peers) > 0 { + if !node.activeDownloads[query.ih] { + node.activeDownloads[query.ih] = true + } + // If a node replies to get_peer requests for more than + // MaxNodeDownloads different InfoHashes it's spammy + // and we ignore it + if d.config.MaxNodeDownloads > 0 && len(node.activeDownloads) > d.config.MaxNodeDownloads { + log.Warningf("killing spammy node %s with activeDownloads: %d\n", nettools.BinaryToDottedPort(node.addressBinaryFormat), len(node.activeDownloads)) + d.routingTable.addBadNode(node.address.String()) + d.routingTable.kill(node, d.peerStore) + return + } // Finally, new peers. result := map[InfoHash][]string{query.ih: peers} totalPeers.Add(int64(len(peers))) - log.V(2).Infof("DHT: processGetPeerResults, totalPeers: %v", totalPeers.String()) + log.V(2).Infof("DHT: processGetPeerResults, %s peers: %d, totalPeers: %v from %s", query.ih, len(peers), totalPeers.String(),nettools.BinaryToDottedPort(node.addressBinaryFormat)) select { case d.PeersRequestResults <- result: case <-d.stop: diff --git a/dht_test.go b/dht_test.go index 3993ad4..50f562b 100644 --- a/dht_test.go +++ b/dht_test.go @@ -31,7 +31,7 @@ func ExampleDHT() { return } - infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { fmt.Printf("DecodeInfoHash faiure: %v", err) return @@ -63,7 +63,7 @@ M: // fmt.Println(DecodePeerAddress(peer)) //} - if fmt.Sprintf("%x", ih) == "d1c5676ae7ac98e8b19f63565905105e3c4c37a2" { + if fmt.Sprintf("%x", ih) == "c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca" { fmt.Println("Peer found for the requested infohash or the test was skipped") return } @@ -122,7 +122,7 @@ func TestDHTLocal(t *testing.T) { return } searchRetryPeriod = time.Second - infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { t.Fatalf(err.Error()) } diff --git a/examples/find_infohash_and_wait/btkador.go b/examples/find_infohash_and_wait/btkador.go new file mode 100644 index 0000000..ea3f9b6 --- /dev/null +++ b/examples/find_infohash_and_wait/btkador.go @@ -0,0 +1,102 @@ +// Runs a node on a random UDP port that attempts to collect 10 peers for an +// infohash, then keeps running as a passive DHT node. +// +// IMPORTANT: if the UDP port is not reachable from the public internet, you +// may see very few results. +// +// To collect 10 peers, it usually has to contact some 1k nodes. It's much easier +// to find peers for popular infohashes. This process is not instant and should +// take a minute or two, depending on your network connection. +// +// +// There is a builtin web server that can be used to collect debugging stats +// from http://localhost:8711/debug/vars. +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/btkador/dht" +) + +const ( + httpPortTCP = 8711 + numTarget = 5000 + exampleIH = "C3C5FE05C329AE51C6ECA464F6B30BA0A457B2CA" // ubuntu-16.04.5-desktop-amd64.iso +) + +func main() { + flag.Parse() + // To see logs, use the -logtostderr flag and change the verbosity with + // -v 0 (less verbose) up to -v 5 (more verbose). + if len(flag.Args()) != 1 { + fmt.Fprintf(os.Stderr, "Usage: %v \n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Example infohash: %v\n", exampleIH) + flag.PrintDefaults() + os.Exit(1) + } + ih, err := dht.DecodeInfoHash(flag.Args()[0]) + if err != nil { + fmt.Fprintf(os.Stderr, "DecodeInfoHash error: %v\n", err) + os.Exit(1) + } + // Starts a DHT node with the default options. It picks a random UDP port. To change this, see dht.NewConfig. + d, err := dht.New(&dht.Config{ + Address: "", + Port: 0, // Picks a random port. + NumTargetPeers: 5000, + DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881", + MaxNodes: 500, + CleanupPeriod: 15 * time.Minute, + SaveRoutingTable: true, + SavePeriod: 5 * time.Minute, + RateLimit: -1, + MaxInfoHashes: 2048, + MaxInfoHashPeers: 256, + ClientPerMinuteLimit: 50, + ThrottlerTrackedClients: 1000, + UDPProto: "udp4", + MaxSearchQueries: 1000, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "New DHT error: %v", err) + os.Exit(1) + + } + // For debugging. + go http.ListenAndServe(fmt.Sprintf(":%d", httpPortTCP), nil) + + if err = d.Start(); err != nil { + fmt.Fprintf(os.Stderr, "DHT start error: %v", err) + os.Exit(1) + } + go drainresults(d) + + for { + d.PeersRequest(string(ih), false) + time.Sleep(5 * time.Minute) + } +} + +// drainresults loops, printing the address of nodes it has found. +func drainresults(n *dht.DHT) { + count := 0 + fmt.Println("=========================== DHT") + fmt.Println("Note that there are many bad nodes that reply to anything you ask.") + fmt.Println("Peers found:") + for r := range n.PeersRequestResults { + for _, peers := range r { + for _, x := range peers { + fmt.Printf("%d: %v\n", count, dht.DecodePeerAddress(x)) + count++ + //if count >= numTarget { + // os.Exit(0) + //} + } + } + } +} diff --git a/examples/find_infohash_and_wait/findroguenode.go b/examples/find_infohash_and_wait/findroguenode.go new file mode 100644 index 0000000..fb2785c --- /dev/null +++ b/examples/find_infohash_and_wait/findroguenode.go @@ -0,0 +1,98 @@ +// Runs a node on a random UDP port that attempts to collect peers for random +// generated infohashes, to find nodes that resonde to just any get_peers query. +// +// IMPORTANT: if the UDP port is not reachable from the public internet, you +// may see very few results. +// +// +// There is a builtin web server that can be used to collect debugging stats +// from http://localhost:8711/debug/vars. +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/btkador/dht" + + "crypto/rand" + "encoding/hex" +) + +const ( + httpPortTCP = 8711 +) + +func main() { + flag.Parse() + // Starts a DHT node with the default options. It picks a random UDP port. To change this, see dht.NewConfig. + d, err := dht.New(&dht.Config{ + Address: "", + Port: 0, // Picks a random port. + NumTargetPeers: 5, + DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881", + MaxNodes: 500, + CleanupPeriod: 15 * time.Minute, + SaveRoutingTable: true, + SavePeriod: 5 * time.Minute, + RateLimit: -1, + MaxInfoHashes: 2048, + MaxInfoHashPeers: 256, + ClientPerMinuteLimit: 50, + ThrottlerTrackedClients: 1000, + UDPProto: "udp4", + MaxSearchQueries: -1, + MaxNodeDownloads: 1, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "New DHT error: %v", err) + os.Exit(1) + + } + // For debugging. + go http.ListenAndServe(fmt.Sprintf(":%d", httpPortTCP), nil) + + if err = d.Start(); err != nil { + fmt.Fprintf(os.Stderr, "DHT start error: %v", err) + os.Exit(1) + } + go drainresults(d) + + for { + ih, err := dht.DecodeInfoHash(randomHex(20)) + if err != nil { + fmt.Fprintf(os.Stderr, "DecodeInfoHash error: %v\n", err) + os.Exit(1) + } + d.PeersRequest(string(ih), false) + time.Sleep(5 * time.Second) + } +} + +// drainresults loops, printing the address of nodes it has found. +func drainresults(n *dht.DHT) { + count := 0 + fmt.Println("=========================== DHT") + fmt.Println("Note that there are many bad nodes that reply to anything you ask.") + fmt.Println("Peers found:") + for r := range n.PeersRequestResults { + for _, peers := range r { + for _, x := range peers { + fmt.Printf("%d: %v\n", count, dht.DecodePeerAddress(x)) + count++ + } + } + } +} + + +func randomHex(n int) string { + bytes := make([]byte, n) + if _, err := rand.Read(bytes); err != nil { + return "" + } + return hex.EncodeToString(bytes) +} diff --git a/krpc.go b/krpc.go index ddaf365..8bda005 100644 --- a/krpc.go +++ b/krpc.go @@ -32,9 +32,10 @@ type remoteNode struct { pendingQueries map[string]*queryType // key: transaction ID pastQueries map[string]*queryType // key: transaction ID reachable bool + createTime time.Time lastResponseTime time.Time lastSearchTime time.Time - ActiveDownloads []string // List of infohashes we know this peer is downloading. + activeDownloads map[InfoHash]bool // List of infohashes we know this peer is downloading. } func newRemoteNode(addr net.UDPAddr, id string) *remoteNode { @@ -44,8 +45,10 @@ func newRemoteNode(addr net.UDPAddr, id string) *remoteNode { lastQueryID: newTransactionId(), id: id, reachable: false, + createTime: time.Now(), pendingQueries: map[string]*queryType{}, pastQueries: map[string]*queryType{}, + activeDownloads: make(map[InfoHash]bool), } } diff --git a/krpc_test.go b/krpc_test.go index 5414df1..368f70d 100644 --- a/krpc_test.go +++ b/krpc_test.go @@ -5,7 +5,7 @@ import ( ) func TestDecodeInfoHash(t *testing.T) { - infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { t.Fatalf("DecodeInfoHash faiure: %v", err) } diff --git a/neighborhood_test.go b/neighborhood_test.go index a1c2c31..cb7cbb5 100644 --- a/neighborhood_test.go +++ b/neighborhood_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "net" "testing" + "time" ) const ( @@ -51,7 +52,7 @@ func TestUpkeep(t *testing.T) { // there should be no sign of them later on. n := randNodeId() n[0] = byte(0x3d) // Ensure long distance. - r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0)) + r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0, 0, 10 * time.Minute)) } // Current state: 8 neighbors with low proximity. @@ -59,7 +60,7 @@ func TestUpkeep(t *testing.T) { // Adds 7 neighbors from the static table. They should replace the // random ones, except for one. for _, v := range table[1:8] { - r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0)) + r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0, 0, 10 * time.Minute)) } // Current state: 7 close neighbors, one distant dude. @@ -80,7 +81,7 @@ func TestUpkeep(t *testing.T) { if r.boundaryNode == nil { t.Fatalf("tried to kill nil boundary node") } - r.kill(r.boundaryNode, newPeerStore(0, 0)) + r.kill(r.boundaryNode, newPeerStore(0, 0, 0, 10 * time.Minute)) // The resulting boundary neighbor should now be one from the static // table, with high proximity. diff --git a/peer_store.go b/peer_store.go index e091424..8c788aa 100644 --- a/peer_store.go +++ b/peer_store.go @@ -5,6 +5,9 @@ import ( log "github.com/golang/glog" "github.com/golang/groupcache/lru" + + "github.com/patrickmn/go-cache" + "time" ) // For the inner map, the key address in binary form. value=ignored. @@ -124,12 +127,14 @@ func (p *peerContactsSet) Alive() int { return ret } -func newPeerStore(maxInfoHashes, maxInfoHashPeers int) *peerStore { +func newPeerStore(maxInfoHashes, maxInfoHashPeers, maxNodes int, SearchCntExpire time.Duration) *peerStore { return &peerStore{ infoHashPeers: lru.New(maxInfoHashes), localActiveDownloads: make(map[InfoHash]bool), + searchCount: cache.New(SearchCntExpire, time.Duration(SearchCntExpire/2)), maxInfoHashes: maxInfoHashes, maxInfoHashPeers: maxInfoHashPeers, + maxNodes: maxNodes, } } @@ -139,8 +144,10 @@ type peerStore struct { infoHashPeers *lru.Cache // infoHashes for which we are peers. localActiveDownloads map[InfoHash]bool + searchCount *cache.Cache maxInfoHashes int maxInfoHashPeers int + maxNodes int } func (h *peerStore) get(ih InfoHash) *peerContactsSet { @@ -225,3 +232,20 @@ func (h *peerStore) hasLocalDownload(ih InfoHash) bool { log.V(3).Infof("hasLocalDownload for %x: %v", ih, ok) return ok } + +// count the number of get_peer requests per hash +func (h *peerStore) addSearchCount(ih InfoHash) int { + cnt, err := h.searchCount.IncrementInt(string(ih),1) + count := 1 + if err == nil { + count = cnt + } else { + h.searchCount.Set(string(ih),int(1),cache.DefaultExpiration) + } + log.V(3).Infof("searchCount %x: %d", ih, count) + return count +} + +func (h *peerStore) resetSearchCount(ih InfoHash) { + h.searchCount.Delete(string(ih)) +} diff --git a/peer_store_test.go b/peer_store_test.go index 59254c5..e7de54d 100644 --- a/peer_store_test.go +++ b/peer_store_test.go @@ -2,15 +2,16 @@ package dht import ( "testing" + "time" ) func TestPeerStorage(t *testing.T) { - ih, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + ih, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { t.Fatalf("DecodeInfoHash: %v", err) } // Allow 1 IH and 2 peers. - p := newPeerStore(1, 2) + p := newPeerStore(1, 2, 0, 10 * time.Minute) if ok := p.addContact(ih, "abcedf"); !ok { t.Fatalf("addContact(1/2) expected true, got false") @@ -31,7 +32,7 @@ func TestPeerStorage(t *testing.T) { t.Fatalf("Added 3rd contact, got count %v, wanted 2", p.count(ih)) } - ih2, err := DecodeInfoHash("deca7a89a1dbdc4b213de1c0d5351e92582f31fb") + ih2, err := DecodeInfoHash("e84213a794f3ccd890382a54a64ca68b7e925433") if err != nil { t.Fatalf("DecodeInfoHash: %v", err) } diff --git a/routing_table.go b/routing_table.go index 9ae40cf..bd1d4d3 100644 --- a/routing_table.go +++ b/routing_table.go @@ -17,6 +17,7 @@ func newRoutingTable() *routingTable { "", nil, 0, + make(map[string]bool), } } @@ -33,6 +34,9 @@ type routingTable struct { boundaryNode *remoteNode // How many prefix bits are shared between boundaryNode and nodeId. proximity int + + // blacklist of bad nodes in host:port format as key + badNodes map[string]bool } // hostPortToNode finds a node based on the specified hostPort specification, @@ -138,6 +142,9 @@ func (r *routingTable) insert(node *remoteNode, proto string) error { if existed { return nil // fmt.Errorf("node already existed in routing table: %v", node.address.String()) } + if r.isBadNode(addr) { + return nil + } r.addresses[addr] = node // We don't know the ID of all nodes. if !bogusId(node.id) { @@ -179,6 +186,20 @@ func (r *routingTable) kill(n *remoteNode, p *peerStore) { p.killContact(nettools.BinaryToDottedPort(n.addressBinaryFormat)) } +// add address to node blacklist +func (r *routingTable) addBadNode(address string) { + log.V(3).Infof("addBadNode for %v: true", address) + r.badNodes[address] = true +} + +// check if address is on blacklist +func (r *routingTable) isBadNode(address string) bool { + _, ok := r.badNodes[address] + log.V(5).Infof("isBadNode for %v: %v", address, ok) + return ok +} + + func (r *routingTable) resetNeighborhoodBoundary() { r.proximity = 0 // Try to find a distant one within the neighborhood and promote it as @@ -206,6 +227,12 @@ func (r *routingTable) cleanup(cleanupPeriod time.Duration, p *peerStore) (needP r.kill(n, p) continue } + // kill old and currently unused nodes if nodeCount is > maxNodes + if len(r.addresses) > p.maxNodes && time.Since(n.createTime) > cleanupPeriod && len(n.pendingQueries) == 0 { + log.V(4).Infof("DHT: Old node with 0 pendingQueries. Deleting") + r.kill(n, p) + continue + } if n.reachable { if len(n.pendingQueries) == 0 { goto PING