diff --git a/torrent.go b/torrent.go index a343f10a..9e253492 100644 --- a/torrent.go +++ b/torrent.go @@ -8,7 +8,6 @@ import ( "io" "math" "math/rand" - "net" "net/url" "os" "sync" @@ -1319,55 +1318,38 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { // Adds peers revealed in an announce until the announce ends, or we have // enough peers. -func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { +func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) { cl := t.cl - // Count all the unique addresses we got during this announce. - allAddrs := make(map[string]struct{}) - for { - select { - case v, ok := <-pvs: - if !ok { - return + for v := range pvs { + cl.lock() + for _, cp := range v.Peers { + if cp.Port == 0 { + // Can't do anything with this. + continue } - addPeers := make([]Peer, 0, len(v.Peers)) - for _, cp := range v.Peers { - if cp.Port == 0 { - // Can't do anything with this. - continue - } - addPeers = append(addPeers, Peer{ - IP: cp.IP[:], - Port: cp.Port, - Source: peerSourceDHTGetPeers, - }) - key := (&net.UDPAddr{ - IP: cp.IP[:], - Port: cp.Port, - }).String() - allAddrs[key] = struct{}{} - } - cl.lock() - t.addPeers(addPeers) - numPeers := t.peers.Len() - cl.unlock() - if numPeers >= cl.config.TorrentPeersHighWater { - return - } - case <-t.closed.LockedChan(cl.locker()): - return + t.addPeer(Peer{ + IP: cp.IP[:], + Port: cp.Port, + Source: peerSourceDHTGetPeers, + }) } + cl.unlock() } } -func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) { - cl := t.cl - ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort) +func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error { + ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort) if err != nil { - return + return err + } + go t.consumeDhtAnnouncePeers(ps.Peers) + select { + case <-t.closed.LockedChan(t.cl.locker()): + case <-time.After(5 * time.Minute): + case <-t.wantPeersEvent.LockedChan(t.cl.locker()): } - t.consumeDHTAnnounce(ps.Peers) ps.Close() - return + return nil } func (t *Torrent) dhtAnnouncer(s *dht.Server) { @@ -1378,7 +1360,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { case <-t.closed.LockedChan(cl.locker()): return } - err := t.announceDHT(true, s) + err := t.announceToDht(true, s) func() { cl.lock() defer cl.unlock() @@ -1388,11 +1370,6 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { t.logger.Printf("error announcing %q to DHT: %s", t, err) } }() - select { - case <-t.closed.LockedChan(cl.locker()): - return - case <-time.After(5 * time.Minute): - } } }