169 lines
4.3 KiB
Go
169 lines
4.3 KiB
Go
package torrent
|
|
|
|
import (
|
|
"fmt"
|
|
"net/netip"
|
|
"time"
|
|
|
|
g "github.com/anacrolix/generics"
|
|
"github.com/anacrolix/log"
|
|
|
|
pp "github.com/anacrolix/torrent/peer_protocol"
|
|
)
|
|
|
|
const (
|
|
pexRetryDelay = 10 * time.Second
|
|
pexInterval = 1 * time.Minute
|
|
)
|
|
|
|
// per-connection PEX state
|
|
type pexConnState struct {
|
|
enabled bool
|
|
xid pp.ExtensionNumber
|
|
last *pexEvent
|
|
timer *time.Timer
|
|
gate chan struct{}
|
|
readyfn func()
|
|
torrent *Torrent
|
|
Listed bool
|
|
info log.Logger
|
|
dbg log.Logger
|
|
// Running record of live connections the remote end of the connection purports to have.
|
|
remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
|
|
lastRecv time.Time
|
|
}
|
|
|
|
func (s *pexConnState) IsEnabled() bool {
|
|
return s.enabled
|
|
}
|
|
|
|
// Init is called from the reader goroutine upon the extended handshake completion
|
|
func (s *pexConnState) Init(c *PeerConn) {
|
|
xid, ok := c.PeerExtensionIDs[pp.ExtensionNamePex]
|
|
if !ok || xid == 0 || c.t.cl.config.DisablePEX {
|
|
return
|
|
}
|
|
s.xid = xid
|
|
s.last = nil
|
|
s.torrent = c.t
|
|
s.info = c.t.cl.logger.WithDefaultLevel(log.Info)
|
|
s.dbg = c.logger.WithDefaultLevel(log.Debug)
|
|
s.readyfn = c.tickleWriter
|
|
s.gate = make(chan struct{}, 1)
|
|
s.timer = time.AfterFunc(0, func() {
|
|
s.gate <- struct{}{}
|
|
s.readyfn() // wake up the writer
|
|
})
|
|
s.enabled = true
|
|
}
|
|
|
|
// schedule next PEX message
|
|
func (s *pexConnState) sched(delay time.Duration) {
|
|
s.timer.Reset(delay)
|
|
}
|
|
|
|
// generate next PEX message for the peer; returns nil if nothing yet to send
|
|
func (s *pexConnState) genmsg() *pp.PexMsg {
|
|
tx, last := s.torrent.pex.Genmsg(s.last)
|
|
if tx.Len() == 0 {
|
|
return nil
|
|
}
|
|
s.last = last
|
|
return &tx
|
|
}
|
|
|
|
func (s *pexConnState) numPending() int {
|
|
if s.torrent == nil {
|
|
return 0
|
|
}
|
|
return s.torrent.pex.numPending(s.last)
|
|
}
|
|
|
|
// Share is called from the writer goroutine if when it is woken up with the write buffers empty
|
|
// Returns whether there's more room on the send buffer to write to.
|
|
func (s *pexConnState) Share(postfn messageWriter) bool {
|
|
select {
|
|
case <-s.gate:
|
|
if tx := s.genmsg(); tx != nil {
|
|
s.dbg.Print("sending PEX message: ", tx)
|
|
flow := postfn(tx.Message(s.xid))
|
|
s.sched(pexInterval)
|
|
return flow
|
|
} else {
|
|
// no PEX to send this time - try again shortly
|
|
s.sched(pexRetryDelay)
|
|
}
|
|
default:
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
|
|
for _, dropped := range rx.Dropped {
|
|
addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
|
|
delete(s.remoteLiveConns, addrPort)
|
|
}
|
|
for _, dropped := range rx.Dropped6 {
|
|
addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
|
|
delete(s.remoteLiveConns, addrPort)
|
|
}
|
|
for i, added := range rx.Added {
|
|
addr := netip.AddrFrom4(*(*[4]byte)(added.IP.To4()))
|
|
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
|
|
flags := g.SliceGet(rx.AddedFlags, i)
|
|
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
|
|
}
|
|
for i, added := range rx.Added6 {
|
|
addr := netip.AddrFrom16(*(*[16]byte)(added.IP.To16()))
|
|
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
|
|
flags := g.SliceGet(rx.Added6Flags, i)
|
|
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Recv is called from the reader goroutine
|
|
func (s *pexConnState) Recv(payload []byte) error {
|
|
rx, err := pp.LoadPexMsg(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("unmarshalling pex message: %w", err)
|
|
}
|
|
s.dbg.Printf("received pex message: %v", rx)
|
|
torrent.Add("pex added peers received", int64(len(rx.Added)))
|
|
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
|
|
|
|
// "Clients must batch updates to send no more than 1 PEX message per minute."
|
|
timeSinceLastRecv := time.Since(s.lastRecv)
|
|
if timeSinceLastRecv < 45*time.Second {
|
|
return fmt.Errorf("last received only %v ago", timeSinceLastRecv)
|
|
}
|
|
s.lastRecv = time.Now()
|
|
s.updateRemoteLiveConns(rx)
|
|
|
|
var peers peerInfos
|
|
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
|
|
peers.AppendFromPex(rx.Added, rx.AddedFlags)
|
|
if time.Now().Before(s.torrent.pex.rest) {
|
|
s.dbg.Printf("in cooldown period, incoming PEX discarded")
|
|
return nil
|
|
}
|
|
added := s.torrent.addPeers(peers)
|
|
s.dbg.Printf("got %v peers over pex, added %v", len(peers), added)
|
|
|
|
if len(peers) > 0 {
|
|
s.torrent.pex.rest = time.Now().Add(pexInterval)
|
|
}
|
|
|
|
// one day we may also want to:
|
|
// - handle drops somehow
|
|
// - detect malicious peers
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *pexConnState) Close() {
|
|
if s.timer != nil {
|
|
s.timer.Stop()
|
|
}
|
|
}
|