Use netip.AddrPort in PEX code and filter unusable addrs much sooner

This commit is contained in:
Matt Joiner 2023-05-16 17:55:53 +10:00
parent 9be2500729
commit 5f2b09a818
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
6 changed files with 73 additions and 57 deletions

View File

@ -43,3 +43,10 @@ func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
return netip.ParseAddrPort(pra.String()) return netip.ParseAddrPort(pra.String())
} }
} }
func krpcNodeAddrFromAddrPort(addrPort netip.AddrPort) krpc.NodeAddr {
return krpc.NodeAddr{
IP: addrPort.Addr().AsSlice(),
Port: int(addrPort.Port()),
}
}

View File

@ -1072,10 +1072,15 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr {
return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort)) return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
} }
func (c *PeerConn) pexEvent(t pexEventType) pexEvent { func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
f := c.pexPeerFlags() f := c.pexPeerFlags()
addr := c.dialAddr() dialAddr := c.dialAddr()
return pexEvent{t, addr, f, nil} addr, err := addrPortFromPeerRemoteAddr(dialAddr)
if err != nil || !addr.IsValid() {
err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
return
}
return pexEvent{t, addr, f, nil}, nil
} }
func (c *PeerConn) String() string { func (c *PeerConn) String() string {

View File

@ -181,6 +181,7 @@ func TestConnPexPeerFlags(t *testing.T) {
} }
func TestConnPexEvent(t *testing.T) { func TestConnPexEvent(t *testing.T) {
c := qt.New(t)
var ( var (
udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
@ -195,27 +196,39 @@ func TestConnPexEvent(t *testing.T) {
{ {
pexAdd, pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil}, pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil},
}, },
{ {
pexDrop, pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port}, &PeerConn{
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil}, Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true},
PeerListenPort: dialTcpAddr.Port,
},
pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil},
}, },
{ {
pexAdd, pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port}, &PeerConn{
pexEvent{pexAdd, dialTcpAddr, 0, nil}, Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()},
PeerListenPort: dialTcpAddr.Port,
},
pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil},
}, },
{ {
pexDrop, pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port}, &PeerConn{
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil}, Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()},
PeerListenPort: dialUdpAddr.Port,
},
pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil},
}, },
} }
for i, tc := range testcases { for i, tc := range testcases {
e := tc.c.pexEvent(tc.t) c.Run(fmt.Sprintf("%v", i), func(c *qt.C) {
require.EqualValues(t, tc.e, e, i) e, err := tc.c.pexEvent(tc.t)
c.Assert(err, qt.IsNil)
c.Check(e, qt.Equals, tc.e)
})
} }
} }

43
pex.go
View File

@ -2,10 +2,9 @@ package torrent
import ( import (
"net" "net"
"net/netip"
"sync" "sync"
"github.com/anacrolix/dht/v2/krpc"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
@ -26,7 +25,7 @@ const (
// represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event
type pexEvent struct { type pexEvent struct {
t pexEventType t pexEventType
addr PeerRemoteAddr addr netip.AddrPort
f pp.PexPeerFlags f pp.PexPeerFlags
next *pexEvent // event feed list next *pexEvent // event feed list
} }
@ -34,8 +33,8 @@ type pexEvent struct {
// facilitates efficient de-duplication while generating PEX messages // facilitates efficient de-duplication while generating PEX messages
type pexMsgFactory struct { type pexMsgFactory struct {
msg pp.PexMsg msg pp.PexMsg
added map[addrKey]struct{} added map[netip.AddrPort]struct{}
dropped map[addrKey]struct{} dropped map[netip.AddrPort]struct{}
} }
func (me *pexMsgFactory) DeltaLen() int { func (me *pexMsgFactory) DeltaLen() int {
@ -44,11 +43,11 @@ func (me *pexMsgFactory) DeltaLen() int {
int64(len(me.dropped)))) int64(len(me.dropped))))
} }
type addrKey string type addrKey = netip.AddrPort
// Returns the key to use to identify a given addr in the factory. // Returns the key to use to identify a given addr in the factory.
func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey { func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey {
return addrKey(addr.String()) return addr
} }
// Returns whether the entry was added (we can check if we're cancelling out another entry and so // Returns whether the entry was added (we can check if we're cancelling out another entry and so
@ -61,10 +60,7 @@ func (me *pexMsgFactory) add(e pexEvent) {
if me.added == nil { if me.added == nil {
me.added = make(map[addrKey]struct{}, pexMaxDelta) me.added = make(map[addrKey]struct{}, pexMaxDelta)
} }
addr, ok := nodeAddr(e.addr) addr := krpcNodeAddrFromAddrPort(e.addr)
if !ok {
return
}
m := &me.msg m := &me.msg
switch { switch {
case addr.IP.To4() != nil: case addr.IP.To4() != nil:
@ -96,10 +92,7 @@ func (me *pexMsgFactory) add(e pexEvent) {
// Returns whether the entry was added (we can check if we're cancelling out another entry and so // Returns whether the entry was added (we can check if we're cancelling out another entry and so
// won't hit the limit consuming this event). // won't hit the limit consuming this event).
func (me *pexMsgFactory) drop(e pexEvent) { func (me *pexMsgFactory) drop(e pexEvent) {
addr, ok := nodeAddr(e.addr) addr := krpcNodeAddrFromAddrPort(e.addr)
if !ok {
return
}
key := me.addrKey(e.addr) key := me.addrKey(e.addr)
if me.dropped == nil { if me.dropped == nil {
me.dropped = make(map[addrKey]struct{}, pexMaxDelta) me.dropped = make(map[addrKey]struct{}, pexMaxDelta)
@ -148,14 +141,6 @@ func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
return &me.msg return &me.msg
} }
// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
// format.
func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) {
ipport, _ := tryIpPortFromNetAddr(addr)
ok := ipport.IP != nil
return krpc.NodeAddr{IP: ipport.IP, Port: ipport.Port}, ok
}
// Per-torrent PEX state // Per-torrent PEX state
type pexState struct { type pexState struct {
sync.RWMutex sync.RWMutex
@ -184,6 +169,10 @@ func (s *pexState) append(e *pexEvent) {
} }
func (s *pexState) Add(c *PeerConn) { func (s *pexState) Add(c *PeerConn) {
e, err := c.pexEvent(pexAdd)
if err != nil {
return
}
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
s.nc++ s.nc++
@ -194,7 +183,6 @@ func (s *pexState) Add(c *PeerConn) {
} }
s.hold = s.hold[:0] s.hold = s.hold[:0]
} }
e := c.pexEvent(pexAdd)
c.pex.Listed = true c.pex.Listed = true
s.append(&e) s.append(&e)
} }
@ -204,9 +192,12 @@ func (s *pexState) Drop(c *PeerConn) {
// skip connections which were not previously Added // skip connections which were not previously Added
return return
} }
e, err := c.pexEvent(pexDrop)
if err != nil {
return
}
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
e := c.pexEvent(pexDrop)
s.nc-- s.nc--
if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { if s.nc < pexTargAdded && len(s.hold) < pexMaxHold {
s.hold = append(s.hold, e) s.hold = append(s.hold, e)

View File

@ -47,12 +47,12 @@ func TestPexReset(t *testing.T) {
require.EqualValues(t, targ, s) require.EqualValues(t, targ, s)
} }
func mustNodeAddr(addr net.Addr) krpc.NodeAddr { func krpcNodeAddrFromNetAddr(addr net.Addr) krpc.NodeAddr {
ret, ok := nodeAddr(addr) addrPort, err := addrPortFromPeerRemoteAddr(addr)
if !ok { if err != nil {
panic(addr) panic(err)
} }
return ret return krpcNodeAddrFromAddrPort(addrPort)
} }
var testcases = []struct { var testcases = []struct {
@ -99,13 +99,13 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{ Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]), krpcNodeAddrFromNetAddr(addrs[2]),
mustNodeAddr(addrs[3]), krpcNodeAddrFromNetAddr(addrs[3]),
}, },
AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0}, AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0},
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]), krpcNodeAddrFromNetAddr(addrs[0]),
mustNodeAddr(addrs[1]), krpcNodeAddrFromNetAddr(addrs[1]),
}, },
Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn}, Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn},
}, },
@ -120,10 +120,10 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Dropped: krpc.CompactIPv4NodeAddrs{ Dropped: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]), krpcNodeAddrFromNetAddr(addrs[2]),
}, },
Dropped6: krpc.CompactIPv6NodeAddrs{ Dropped6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]), krpcNodeAddrFromNetAddr(addrs[0]),
}, },
}, },
}, },
@ -144,7 +144,7 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]), krpcNodeAddrFromNetAddr(addrs[1]),
}, },
Added6Flags: []pp.PexPeerFlags{0}, Added6Flags: []pp.PexPeerFlags{0},
}, },
@ -168,12 +168,12 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Added: krpc.CompactIPv4NodeAddrs{ Added: krpc.CompactIPv4NodeAddrs{
mustNodeAddr(addrs[2]), krpcNodeAddrFromNetAddr(addrs[2]),
}, },
AddedFlags: []pp.PexPeerFlags{0}, AddedFlags: []pp.PexPeerFlags{0},
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]), krpcNodeAddrFromNetAddr(addrs[0]),
mustNodeAddr(addrs[1]), krpcNodeAddrFromNetAddr(addrs[1]),
}, },
Added6Flags: []pp.PexPeerFlags{0, 0}, Added6Flags: []pp.PexPeerFlags{0, 0},
}, },
@ -193,7 +193,7 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]), krpcNodeAddrFromNetAddr(addrs[1]),
}, },
Added6Flags: []pp.PexPeerFlags{0}, Added6Flags: []pp.PexPeerFlags{0},
}, },
@ -207,7 +207,7 @@ var testcases = []struct {
}(), }(),
targ: pp.PexMsg{ targ: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[0]), krpcNodeAddrFromNetAddr(addrs[0]),
}, },
Added6Flags: []pp.PexPeerFlags{0}, Added6Flags: []pp.PexPeerFlags{0},
}, },
@ -216,7 +216,7 @@ var testcases = []struct {
}, },
targ1: pp.PexMsg{ targ1: pp.PexMsg{
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addrs[1]), krpcNodeAddrFromNetAddr(addrs[1]),
}, },
Added6Flags: []pp.PexPeerFlags{0}, Added6Flags: []pp.PexPeerFlags{0},
}, },

View File

@ -53,7 +53,7 @@ func TestPexConnState(t *testing.T) {
Added: krpc.CompactIPv4NodeAddrs(nil), Added: krpc.CompactIPv4NodeAddrs(nil),
AddedFlags: []pp.PexPeerFlags{}, AddedFlags: []pp.PexPeerFlags{},
Added6: krpc.CompactIPv6NodeAddrs{ Added6: krpc.CompactIPv6NodeAddrs{
mustNodeAddr(addr), krpcNodeAddrFromNetAddr(addr),
}, },
Added6Flags: []pp.PexPeerFlags{0}, Added6Flags: []pp.PexPeerFlags{0},
} }