diff --git a/pex.go b/pex.go index 83376d48..a808af2b 100644 --- a/pex.go +++ b/pex.go @@ -3,6 +3,7 @@ package torrent import ( "net" "sync" + "time" "github.com/anacrolix/dht/v2/krpc" pp "github.com/anacrolix/torrent/peer_protocol" @@ -179,6 +180,7 @@ func shortestIP(ip net.IP) net.IP { type pexState struct { ev []pexEvent // event feed, append-only hold []pexEvent // delayed drops + rest time.Time // cooldown deadline on inbound nc int // net number of alive conns initCache pexMsgFactory // last generated initial message initSeq int // number of events which went into initCache @@ -190,6 +192,7 @@ func (s *pexState) Reset() { s.ev = nil s.hold = nil s.nc = 0 + s.rest = time.Time{} s.initLock.Lock() s.initCache = pexMsgFactory{} s.initSeq = 0 diff --git a/pexconn.go b/pexconn.go index b3719ec8..2940c4d2 100644 --- a/pexconn.go +++ b/pexconn.go @@ -88,6 +88,15 @@ func (s *pexConnState) Share(postfn messageWriter) bool { // Recv is called from the reader goroutine func (s *pexConnState) Recv(payload []byte) error { + if !s.torrent.wantPeers() { + s.dbg.Printf("peer reserve ok, incoming PEX discarded") + return nil + } + if time.Now().Before(s.torrent.pex.rest) { + s.dbg.Printf("in cooldown period, incoming PEX discarded") + return nil + } + rx, err := pp.LoadPexMsg(payload) if err != nil { return fmt.Errorf("error unmarshalling PEX message: %s", err) @@ -100,8 +109,10 @@ func (s *pexConnState) Recv(payload []byte) error { peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) s.dbg.Printf("adding %d peers from PEX", len(peers)) - s.torrent.addPeers(peers) - // s.dbg.Print("known swarm now:", s.torrent.KnownSwarm()) + if len(peers) > 0 { + s.torrent.pex.rest = time.Now().Add(pexInterval) + s.torrent.addPeers(peers) + } // one day we may also want to: // - check if the peer is not flooding us with PEX updates