From 581a3d932511d0b5f5262662ffd2f92024229997 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 11 Jul 2014 19:30:20 +1000 Subject: [PATCH] Add DHT support to cmd/torrent --- client.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ cmd/torrent/main.go | 31 ++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/client.go b/client.go index 0c3cdee4..21128da6 100644 --- a/client.go +++ b/client.go @@ -16,6 +16,7 @@ Simple example: package torrent import ( + "bitbucket.org/anacrolix/go.torrent/dht" "bitbucket.org/anacrolix/go.torrent/util" "bufio" "container/list" @@ -105,6 +106,7 @@ type Client struct { Listener net.Listener DisableTrackers bool DownloadStrategy DownloadStrategy + DHT *dht.Server mu sync.Mutex event sync.Cond @@ -865,6 +867,9 @@ func (me *Client) addTorrent(t *torrent) (err error) { if !me.DisableTrackers { go me.announceTorrent(t) } + if me.DHT != nil { + go me.announceTorrentDHT(t) + } return } @@ -904,6 +909,47 @@ func (cl *Client) listenerAnnouncePort() (port int16) { return } +func (cl *Client) announceTorrentDHT(t *torrent) { + for { + ps, err := cl.DHT.GetPeers(string(t.InfoHash[:])) + if err != nil { + log.Printf("error getting peers from dht: %s", err) + return + } + nextScrape := time.After(1 * time.Minute) + getPeers: + for { + select { + case <-nextScrape: + break getPeers + case cps, ok := <-ps.Values: + if !ok { + break getPeers + } + err = cl.AddPeers(t.InfoHash, func() (ret []Peer) { + for _, cp := range cps { + ret = append(ret, Peer{ + IP: cp.IP[:], + Port: int(cp.Port), + }) + log.Printf("peer from dht: %s", &net.UDPAddr{ + IP: cp.IP[:], + Port: int(cp.Port), + }) + } + return + }()) + if err != nil { + log.Printf("error adding peers from dht for torrent %q: %s", t, err) + break getPeers + } + log.Printf("got %d peers from dht for torrent %q", len(cps), t) + } + } + ps.Close() + } +} + func (cl *Client) announceTorrent(t *torrent) { req := tracker.AnnounceRequest{ Event: tracker.Started, diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 62b5472e..f98b73e0 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -1,6 +1,7 @@ package main import ( + "bitbucket.org/anacrolix/go.torrent/dht" "flag" "fmt" "log" @@ -42,10 +43,40 @@ func main() { if *httpAddr != "" { go http.ListenAndServe(*httpAddr, nil) } + dhtServer := &dht.Server{ + Socket: func() *net.UDPConn { + addr, err := net.ResolveUDPAddr("udp4", *listenAddr) + if err != nil { + log.Fatalf("error resolving dht listen addr: %s", err) + } + s, err := net.ListenUDP("udp4", addr) + if err != nil { + log.Fatalf("error creating dht socket: %s", err) + } + return s + }(), + } + err := dhtServer.Init() + if err != nil { + log.Fatalf("error initing dht server: %s", err) + } + go func() { + err := dhtServer.Serve() + if err != nil { + log.Fatalf("error serving dht: %s", err) + } + }() + go func() { + err := dhtServer.Bootstrap() + if err != nil { + log.Printf("error bootstrapping dht server: %s", err) + } + }() client := torrent.Client{ DataDir: *downloadDir, Listener: makeListener(), DisableTrackers: *disableTrackers, + DHT: dhtServer, } http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { client.WriteStatus(w)