From 5f2b09a81889ff2f6555f8c2b177a933528ef029 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 16 May 2023 17:55:53 +1000 Subject: [PATCH] Use netip.AddrPort in PEX code and filter unusable addrs much sooner --- netip-addrport.go | 7 +++++++ peerconn.go | 11 ++++++++--- peerconn_test.go | 31 ++++++++++++++++++++++--------- pex.go | 43 +++++++++++++++++-------------------------- pex_test.go | 36 ++++++++++++++++++------------------ pexconn_test.go | 2 +- 6 files changed, 73 insertions(+), 57 deletions(-) diff --git a/netip-addrport.go b/netip-addrport.go index 4e2dc94d..a57ea206 100644 --- a/netip-addrport.go +++ b/netip-addrport.go @@ -43,3 +43,10 @@ func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) { return netip.ParseAddrPort(pra.String()) } } + +func krpcNodeAddrFromAddrPort(addrPort netip.AddrPort) krpc.NodeAddr { + return krpc.NodeAddr{ + IP: addrPort.Addr().AsSlice(), + Port: int(addrPort.Port()), + } +} diff --git a/peerconn.go b/peerconn.go index 99157583..a9b835ac 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1072,10 +1072,15 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr { return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort)) } -func (c *PeerConn) pexEvent(t pexEventType) pexEvent { +func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) { f := c.pexPeerFlags() - addr := c.dialAddr() - return pexEvent{t, addr, f, nil} + dialAddr := c.dialAddr() + addr, err := addrPortFromPeerRemoteAddr(dialAddr) + if err != nil || !addr.IsValid() { + err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err) + return + } + return pexEvent{t, addr, f, nil}, nil } func (c *PeerConn) String() string { diff --git a/peerconn_test.go b/peerconn_test.go index e418e07e..a107976b 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -181,6 +181,7 @@ func TestConnPexPeerFlags(t *testing.T) { } func TestConnPexEvent(t *testing.T) { + c := qt.New(t) var ( udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848} tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848} @@ -195,27 +196,39 @@ func TestConnPexEvent(t *testing.T) { { pexAdd, &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, - pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil}, + pexEvent{pexAdd, udpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port}, - pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexDrop, tcpAddr.AddrPort(), pp.PexOutgoingConn, nil}, }, { pexAdd, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port}, - pexEvent{pexAdd, dialTcpAddr, 0, nil}, + &PeerConn{ + Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, + PeerListenPort: dialTcpAddr.Port, + }, + pexEvent{pexAdd, dialTcpAddr.AddrPort(), 0, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port}, - pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil}, + &PeerConn{ + Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, + PeerListenPort: dialUdpAddr.Port, + }, + pexEvent{pexDrop, dialUdpAddr.AddrPort(), pp.PexSupportsUtp, nil}, }, } for i, tc := range testcases { - e := tc.c.pexEvent(tc.t) - require.EqualValues(t, tc.e, e, i) + c.Run(fmt.Sprintf("%v", i), func(c *qt.C) { + e, err := tc.c.pexEvent(tc.t) + c.Assert(err, qt.IsNil) + c.Check(e, qt.Equals, tc.e) + }) } } diff --git a/pex.go b/pex.go index c5ed6099..4770f3d3 100644 --- a/pex.go +++ b/pex.go @@ -2,10 +2,9 @@ package torrent import ( "net" + "net/netip" "sync" - "github.com/anacrolix/dht/v2/krpc" - pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -26,7 +25,7 @@ const ( // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event type pexEvent struct { t pexEventType - addr PeerRemoteAddr + addr netip.AddrPort f pp.PexPeerFlags next *pexEvent // event feed list } @@ -34,8 +33,8 @@ type pexEvent struct { // facilitates efficient de-duplication while generating PEX messages type pexMsgFactory struct { msg pp.PexMsg - added map[addrKey]struct{} - dropped map[addrKey]struct{} + added map[netip.AddrPort]struct{} + dropped map[netip.AddrPort]struct{} } func (me *pexMsgFactory) DeltaLen() int { @@ -44,11 +43,11 @@ func (me *pexMsgFactory) DeltaLen() int { int64(len(me.dropped)))) } -type addrKey string +type addrKey = netip.AddrPort // Returns the key to use to identify a given addr in the factory. -func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey { - return addrKey(addr.String()) +func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey { + return addr } // Returns whether the entry was added (we can check if we're cancelling out another entry and so @@ -61,10 +60,7 @@ func (me *pexMsgFactory) add(e pexEvent) { if me.added == nil { me.added = make(map[addrKey]struct{}, pexMaxDelta) } - addr, ok := nodeAddr(e.addr) - if !ok { - return - } + addr := krpcNodeAddrFromAddrPort(e.addr) m := &me.msg switch { case addr.IP.To4() != nil: @@ -96,10 +92,7 @@ func (me *pexMsgFactory) add(e pexEvent) { // Returns whether the entry was added (we can check if we're cancelling out another entry and so // won't hit the limit consuming this event). func (me *pexMsgFactory) drop(e pexEvent) { - addr, ok := nodeAddr(e.addr) - if !ok { - return - } + addr := krpcNodeAddrFromAddrPort(e.addr) key := me.addrKey(e.addr) if me.dropped == nil { me.dropped = make(map[addrKey]struct{}, pexMaxDelta) @@ -148,14 +141,6 @@ func (me *pexMsgFactory) PexMsg() *pp.PexMsg { return &me.msg } -// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr -// format. -func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) { - ipport, _ := tryIpPortFromNetAddr(addr) - ok := ipport.IP != nil - return krpc.NodeAddr{IP: ipport.IP, Port: ipport.Port}, ok -} - // Per-torrent PEX state type pexState struct { sync.RWMutex @@ -184,6 +169,10 @@ func (s *pexState) append(e *pexEvent) { } func (s *pexState) Add(c *PeerConn) { + e, err := c.pexEvent(pexAdd) + if err != nil { + return + } s.Lock() defer s.Unlock() s.nc++ @@ -194,7 +183,6 @@ func (s *pexState) Add(c *PeerConn) { } s.hold = s.hold[:0] } - e := c.pexEvent(pexAdd) c.pex.Listed = true s.append(&e) } @@ -204,9 +192,12 @@ func (s *pexState) Drop(c *PeerConn) { // skip connections which were not previously Added return } + e, err := c.pexEvent(pexDrop) + if err != nil { + return + } s.Lock() defer s.Unlock() - e := c.pexEvent(pexDrop) s.nc-- if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { s.hold = append(s.hold, e) diff --git a/pex_test.go b/pex_test.go index 7d8f6d5d..089e0df2 100644 --- a/pex_test.go +++ b/pex_test.go @@ -47,12 +47,12 @@ func TestPexReset(t *testing.T) { require.EqualValues(t, targ, s) } -func mustNodeAddr(addr net.Addr) krpc.NodeAddr { - ret, ok := nodeAddr(addr) - if !ok { - panic(addr) +func krpcNodeAddrFromNetAddr(addr net.Addr) krpc.NodeAddr { + addrPort, err := addrPortFromPeerRemoteAddr(addr) + if err != nil { + panic(err) } - return ret + return krpcNodeAddrFromAddrPort(addrPort) } var testcases = []struct { @@ -99,13 +99,13 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ - mustNodeAddr(addrs[2]), - mustNodeAddr(addrs[3]), + krpcNodeAddrFromNetAddr(addrs[2]), + krpcNodeAddrFromNetAddr(addrs[3]), }, AddedFlags: []pp.PexPeerFlags{pp.PexOutgoingConn, 0}, Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[0]), - mustNodeAddr(addrs[1]), + krpcNodeAddrFromNetAddr(addrs[0]), + krpcNodeAddrFromNetAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{0, pp.PexOutgoingConn}, }, @@ -120,10 +120,10 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Dropped: krpc.CompactIPv4NodeAddrs{ - mustNodeAddr(addrs[2]), + krpcNodeAddrFromNetAddr(addrs[2]), }, Dropped6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[0]), + krpcNodeAddrFromNetAddr(addrs[0]), }, }, }, @@ -144,7 +144,7 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[1]), + krpcNodeAddrFromNetAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{0}, }, @@ -168,12 +168,12 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Added: krpc.CompactIPv4NodeAddrs{ - mustNodeAddr(addrs[2]), + krpcNodeAddrFromNetAddr(addrs[2]), }, AddedFlags: []pp.PexPeerFlags{0}, Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[0]), - mustNodeAddr(addrs[1]), + krpcNodeAddrFromNetAddr(addrs[0]), + krpcNodeAddrFromNetAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{0, 0}, }, @@ -193,7 +193,7 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[1]), + krpcNodeAddrFromNetAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{0}, }, @@ -207,7 +207,7 @@ var testcases = []struct { }(), targ: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[0]), + krpcNodeAddrFromNetAddr(addrs[0]), }, Added6Flags: []pp.PexPeerFlags{0}, }, @@ -216,7 +216,7 @@ var testcases = []struct { }, targ1: pp.PexMsg{ Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addrs[1]), + krpcNodeAddrFromNetAddr(addrs[1]), }, Added6Flags: []pp.PexPeerFlags{0}, }, diff --git a/pexconn_test.go b/pexconn_test.go index 603398c5..f8b9c9e0 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -53,7 +53,7 @@ func TestPexConnState(t *testing.T) { Added: krpc.CompactIPv4NodeAddrs(nil), AddedFlags: []pp.PexPeerFlags{}, Added6: krpc.CompactIPv6NodeAddrs{ - mustNodeAddr(addr), + krpcNodeAddrFromNetAddr(addr), }, Added6Flags: []pp.PexPeerFlags{0}, }