Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements for crawling dht peers #59

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
57 changes: 47 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Config struct {
// If true, the node will read the routing table from disk on startup and save routing
// table snapshots on disk every few minutes. Default value: true.
SaveRoutingTable bool
// do not reply to any incoming queries
PassiveMode bool
// How often to save the routing table to disk. Default value: 5 minutes.
SavePeriod time.Duration
// Maximum packets per second to be processed. Disabled if negative. Default value: 100.
Expand All @@ -86,13 +88,21 @@ type Config struct {
// single peer contact typically consumes 6 bytes. Default value: 256.
MaxInfoHashPeers int
// ClientPerMinuteLimit protects against spammy clients. Ignore their requests if exceeded
// this number of packets per minute. Default value: 50.
// this number of packets per minute. Default value: 50. (0 to disable)
ClientPerMinuteLimit int
// ThrottlerTrackedClients is the number of hosts the client throttler remembers. An LRU is used to
// track the most interesting ones. Default value: 1000.
ThrottlerTrackedClients int64
//Protocol for UDP connections, udp4= IPv4, udp6 = IPv6
UDPProto string
// Maximum get_peer requests per infoHash to prevent infinity loop in case NumTargetPeers is set
// real high.
MaxSearchQueries int
// MaxSearchQueries counter will be reset after that time
SearchCntExpire time.Duration
// If a node replies to get_peer requests for more than MaxNodeDownloads different InfoHashes it's
// spammy and we blacklist it
MaxNodeDownloads int
}

// Creates a *Config populated with default values.
Expand All @@ -101,17 +111,21 @@ func NewConfig() *Config {
Address: "",
Port: 0, // Picks a random port.
NumTargetPeers: 5,
DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881",
DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881,router.utorrent.com:6881,dht.aelitis.com:6881,dht.libtorrent.org:25401",
MaxNodes: 500,
CleanupPeriod: 15 * time.Minute,
SaveRoutingTable: true,
PassiveMode: false,
SavePeriod: 5 * time.Minute,
RateLimit: 100,
MaxInfoHashes: 2048,
MaxInfoHashPeers: 256,
ClientPerMinuteLimit: 50,
ThrottlerTrackedClients: 1000,
UDPProto: "udp4",
MaxSearchQueries: -1,
SearchCntExpire: 10 * time.Minute,
MaxNodeDownloads: -1,
}
}

Expand Down Expand Up @@ -180,7 +194,7 @@ func New(config *Config) (node *DHT, err error) {
node = &DHT{
config: cfg,
routingTable: newRoutingTable(),
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers),
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers, cfg.MaxNodes, cfg.SearchCntExpire),
PeersRequestResults: make(chan map[InfoHash][]string, 1),
stop: make(chan bool),
exploredNeighborhood: false,
Expand Down Expand Up @@ -245,6 +259,8 @@ type ihReq struct {
// is just a router that doesn't downloads torrents.
func (d *DHT) PeersRequest(ih string, announce bool) {
d.peersRequest <- ihReq{InfoHash(ih), announce}
// reset searchCount on new request from bt client
d.peerStore.resetSearchCount(InfoHash(ih))
log.V(2).Infof("DHT: torrent client asking more peers for %x.", ih)
}

Expand Down Expand Up @@ -550,11 +566,6 @@ func (d *DHT) helloFromPeer(addr string) {

func (d *DHT) processPacket(p packetType) {
log.V(5).Infof("DHT processing packet from %v", p.raddr.String())
if !d.clientThrottle.CheckBlock(p.raddr.IP.String()) {
totalPacketsFromBlockedHosts.Add(1)
log.V(5).Infof("Node exceeded rate limiter. Dropping packet.")
return
}
if p.b[0] != 'd' {
// Malformed DHT packet. There are protocol extensions out
// there that we don't support or understand.
Expand All @@ -563,7 +574,7 @@ func (d *DHT) processPacket(p packetType) {
}
r, err := readResponse(p)
if err != nil {
log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
//log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
return
}
switch {
Expand Down Expand Up @@ -636,6 +647,11 @@ func (d *DHT) processPacket(p packetType) {
log.V(3).Infof("DHT: Unknown query id: %v", r.T)
}
case r.Y == "q":
if d.config.ClientPerMinuteLimit > 0 && !d.clientThrottle.CheckBlock(p.raddr.IP.String()) {
totalPacketsFromBlockedHosts.Add(1)
log.V(5).Infof("Node exceeded rate limiter. Dropping packet.")
return
}
if r.A.Id == d.nodeId {
log.V(3).Infof("DHT received packet from self, id %x", r.A.Id)
return
Expand All @@ -651,6 +667,10 @@ func (d *DHT) processPacket(p packetType) {
d.ping(addr)
}
}
// don't reply to any queries if in passiveMode
if d.config.PassiveMode {
return
}
log.V(5).Infof("DHT processing %v request", r.Q)
switch r.Q {
case "ping":
Expand Down Expand Up @@ -692,6 +712,11 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
if r == nil {
return
}
// if MaxSearchQueries is set and reached, don't send new get_peers
cnt := d.peerStore.addSearchCount(ih)
if d.config.MaxSearchQueries > 0 && cnt > d.config.MaxSearchQueries {
return
}
totalSentGetPeers.Add(1)
ty := "get_peers"
transId := r.newQuery(ty)
Expand Down Expand Up @@ -926,10 +951,22 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
peers = append(peers, peerContact)
}
if len(peers) > 0 {
if !node.activeDownloads[query.ih] {
node.activeDownloads[query.ih] = true
}
// If a node replies to get_peer requests for more than
// MaxNodeDownloads different InfoHashes it's spammy
// and we ignore it
if d.config.MaxNodeDownloads > 0 && len(node.activeDownloads) > d.config.MaxNodeDownloads {
log.Warningf("killing spammy node %s with activeDownloads: %d\n", nettools.BinaryToDottedPort(node.addressBinaryFormat), len(node.activeDownloads))
d.routingTable.addBadNode(node.address.String())
d.routingTable.kill(node, d.peerStore)
return
}
// Finally, new peers.
result := map[InfoHash][]string{query.ih: peers}
totalPeers.Add(int64(len(peers)))
log.V(2).Infof("DHT: processGetPeerResults, totalPeers: %v", totalPeers.String())
log.V(2).Infof("DHT: processGetPeerResults, %s peers: %d, totalPeers: %v from %s", query.ih, len(peers), totalPeers.String(),nettools.BinaryToDottedPort(node.addressBinaryFormat))
select {
case d.PeersRequestResults <- result:
case <-d.stop:
Expand Down
6 changes: 3 additions & 3 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ExampleDHT() {
return
}

infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2")
infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca")
if err != nil {
fmt.Printf("DecodeInfoHash faiure: %v", err)
return
Expand Down Expand Up @@ -63,7 +63,7 @@ M:
// fmt.Println(DecodePeerAddress(peer))
//}

if fmt.Sprintf("%x", ih) == "d1c5676ae7ac98e8b19f63565905105e3c4c37a2" {
if fmt.Sprintf("%x", ih) == "c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca" {
fmt.Println("Peer found for the requested infohash or the test was skipped")
return
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestDHTLocal(t *testing.T) {
return
}
searchRetryPeriod = time.Second
infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2")
infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca")
if err != nil {
t.Fatalf(err.Error())
}
Expand Down
102 changes: 102 additions & 0 deletions examples/find_infohash_and_wait/btkador.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Runs a node on a random UDP port that attempts to collect 10 peers for an
// infohash, then keeps running as a passive DHT node.
//
// IMPORTANT: if the UDP port is not reachable from the public internet, you
// may see very few results.
//
// To collect 10 peers, it usually has to contact some 1k nodes. It's much easier
// to find peers for popular infohashes. This process is not instant and should
// take a minute or two, depending on your network connection.
//
//
// There is a builtin web server that can be used to collect debugging stats
// from http://localhost:8711/debug/vars.
package main

import (
"flag"
"fmt"
"net/http"
"os"
"time"

"github.com/btkador/dht"
)

const (
httpPortTCP = 8711
numTarget = 5000
exampleIH = "C3C5FE05C329AE51C6ECA464F6B30BA0A457B2CA" // ubuntu-16.04.5-desktop-amd64.iso
)

func main() {
flag.Parse()
// To see logs, use the -logtostderr flag and change the verbosity with
// -v 0 (less verbose) up to -v 5 (more verbose).
if len(flag.Args()) != 1 {
fmt.Fprintf(os.Stderr, "Usage: %v <infohash>\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Example infohash: %v\n", exampleIH)
flag.PrintDefaults()
os.Exit(1)
}
ih, err := dht.DecodeInfoHash(flag.Args()[0])
if err != nil {
fmt.Fprintf(os.Stderr, "DecodeInfoHash error: %v\n", err)
os.Exit(1)
}
// Starts a DHT node with the default options. It picks a random UDP port. To change this, see dht.NewConfig.
d, err := dht.New(&dht.Config{
Address: "",
Port: 0, // Picks a random port.
NumTargetPeers: 5000,
DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881",
MaxNodes: 500,
CleanupPeriod: 15 * time.Minute,
SaveRoutingTable: true,
SavePeriod: 5 * time.Minute,
RateLimit: -1,
MaxInfoHashes: 2048,
MaxInfoHashPeers: 256,
ClientPerMinuteLimit: 50,
ThrottlerTrackedClients: 1000,
UDPProto: "udp4",
MaxSearchQueries: 1000,
})
if err != nil {
fmt.Fprintf(os.Stderr, "New DHT error: %v", err)
os.Exit(1)

}
// For debugging.
go http.ListenAndServe(fmt.Sprintf(":%d", httpPortTCP), nil)

if err = d.Start(); err != nil {
fmt.Fprintf(os.Stderr, "DHT start error: %v", err)
os.Exit(1)
}
go drainresults(d)

for {
d.PeersRequest(string(ih), false)
time.Sleep(5 * time.Minute)
}
}

// drainresults loops, printing the address of nodes it has found.
func drainresults(n *dht.DHT) {
count := 0
fmt.Println("=========================== DHT")
fmt.Println("Note that there are many bad nodes that reply to anything you ask.")
fmt.Println("Peers found:")
for r := range n.PeersRequestResults {
for _, peers := range r {
for _, x := range peers {
fmt.Printf("%d: %v\n", count, dht.DecodePeerAddress(x))
count++
//if count >= numTarget {
// os.Exit(0)
//}
}
}
}
}
98 changes: 98 additions & 0 deletions examples/find_infohash_and_wait/findroguenode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Runs a node on a random UDP port that attempts to collect peers for random
// generated infohashes, to find nodes that resonde to just any get_peers query.
//
// IMPORTANT: if the UDP port is not reachable from the public internet, you
// may see very few results.
//
//
// There is a builtin web server that can be used to collect debugging stats
// from http://localhost:8711/debug/vars.
package main

import (
"flag"
"fmt"
"net/http"
"os"
"time"

"github.com/btkador/dht"

"crypto/rand"
"encoding/hex"
)

const (
httpPortTCP = 8711
)

func main() {
flag.Parse()
// Starts a DHT node with the default options. It picks a random UDP port. To change this, see dht.NewConfig.
d, err := dht.New(&dht.Config{
Address: "",
Port: 0, // Picks a random port.
NumTargetPeers: 5,
DHTRouters: "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881",
MaxNodes: 500,
CleanupPeriod: 15 * time.Minute,
SaveRoutingTable: true,
SavePeriod: 5 * time.Minute,
RateLimit: -1,
MaxInfoHashes: 2048,
MaxInfoHashPeers: 256,
ClientPerMinuteLimit: 50,
ThrottlerTrackedClients: 1000,
UDPProto: "udp4",
MaxSearchQueries: -1,
MaxNodeDownloads: 1,
})
if err != nil {
fmt.Fprintf(os.Stderr, "New DHT error: %v", err)
os.Exit(1)

}
// For debugging.
go http.ListenAndServe(fmt.Sprintf(":%d", httpPortTCP), nil)

if err = d.Start(); err != nil {
fmt.Fprintf(os.Stderr, "DHT start error: %v", err)
os.Exit(1)
}
go drainresults(d)

for {
ih, err := dht.DecodeInfoHash(randomHex(20))
if err != nil {
fmt.Fprintf(os.Stderr, "DecodeInfoHash error: %v\n", err)
os.Exit(1)
}
d.PeersRequest(string(ih), false)
time.Sleep(5 * time.Second)
}
}

// drainresults loops, printing the address of nodes it has found.
func drainresults(n *dht.DHT) {
count := 0
fmt.Println("=========================== DHT")
fmt.Println("Note that there are many bad nodes that reply to anything you ask.")
fmt.Println("Peers found:")
for r := range n.PeersRequestResults {
for _, peers := range r {
for _, x := range peers {
fmt.Printf("%d: %v\n", count, dht.DecodePeerAddress(x))
count++
}
}
}
}


func randomHex(n int) string {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return ""
}
return hex.EncodeToString(bytes)
}
Loading