Rate limit received PEX messages per connection
This commit is contained in:
parent
1e13625c73
commit
6818a9f773
@ -874,7 +874,11 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
|
||||
if !c.pex.IsEnabled() {
|
||||
return nil // or hang-up maybe?
|
||||
}
|
||||
return c.pex.Recv(payload)
|
||||
err = c.pex.Recv(payload)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("receiving pex message: %w", err)
|
||||
}
|
||||
return
|
||||
default:
|
||||
return fmt.Errorf("unexpected extended message ID: %v", id)
|
||||
}
|
||||
|
3
pex.go
3
pex.go
@ -3,7 +3,6 @@ package torrent
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/dht/v2/krpc"
|
||||
|
||||
@ -162,7 +161,6 @@ type pexState struct {
|
||||
sync.RWMutex
|
||||
tail *pexEvent // event feed list
|
||||
hold []pexEvent // delayed drops
|
||||
rest time.Time // cooldown deadline on inbound
|
||||
nc int // net number of alive conns
|
||||
msg0 pexMsgFactory // initial message
|
||||
}
|
||||
@ -174,7 +172,6 @@ func (s *pexState) Reset() {
|
||||
s.tail = nil
|
||||
s.hold = nil
|
||||
s.nc = 0
|
||||
s.rest = time.Time{}
|
||||
s.msg0 = pexMsgFactory{}
|
||||
}
|
||||
|
||||
|
23
pexconn.go
23
pexconn.go
@ -31,6 +31,7 @@ type pexConnState struct {
|
||||
dbg log.Logger
|
||||
// Running record of live connections the remote end of the connection purports to have.
|
||||
remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
|
||||
lastRecv time.Time
|
||||
}
|
||||
|
||||
func (s *pexConnState) IsEnabled() bool {
|
||||
@ -131,26 +132,20 @@ func (s *pexConnState) Recv(payload []byte) error {
|
||||
s.dbg.Printf("received pex message: %v", rx)
|
||||
torrent.Add("pex added peers received", int64(len(rx.Added)))
|
||||
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
|
||||
s.updateRemoteLiveConns(rx)
|
||||
|
||||
if !s.torrent.wantPeers() {
|
||||
s.dbg.Printf("peer reserve ok, incoming PEX discarded")
|
||||
return nil
|
||||
}
|
||||
// TODO: This should be per conn, not for the whole Torrent.
|
||||
if time.Now().Before(s.torrent.pex.rest) {
|
||||
s.dbg.Printf("in cooldown period, incoming PEX discarded")
|
||||
return nil
|
||||
// "Clients must batch updates to send no more than 1 PEX message per minute."
|
||||
timeSinceLastRecv := time.Since(s.lastRecv)
|
||||
if timeSinceLastRecv < 45*time.Second {
|
||||
return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
|
||||
}
|
||||
s.lastRecv = time.Now()
|
||||
s.updateRemoteLiveConns(rx)
|
||||
|
||||
var peers peerInfos
|
||||
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
|
||||
peers.AppendFromPex(rx.Added, rx.AddedFlags)
|
||||
s.dbg.Printf("adding %d peers from PEX", len(peers))
|
||||
if len(peers) > 0 {
|
||||
s.torrent.pex.rest = time.Now().Add(pexInterval)
|
||||
s.torrent.addPeers(peers)
|
||||
}
|
||||
added := s.torrent.addPeers(peers)
|
||||
s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
|
||||
|
||||
// one day we may also want to:
|
||||
// - check if the peer is not flooding us with PEX updates
|
||||
|
Loading…
x
Reference in New Issue
Block a user