Locate target peer using dial addr when receiving a holepunch rendezvous

This commit is contained in:
Matt Joiner 2023-05-01 12:30:06 +10:00
parent d1845bbede
commit 5703f9b5eb
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
5 changed files with 62 additions and 18 deletions

View File

@ -86,7 +86,6 @@ type (
peerChoking bool
peerRequests map[Request]*peerRequestState
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
PeerListenPort int
// The highest possible number of pieces the torrent could have based on
// communication with the peer. Generally only useful until we have the
// torrent info.

View File

@ -15,10 +15,12 @@ import (
"time"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/generics"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
"golang.org/x/exp/maps"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
@ -40,6 +42,7 @@ type PeerConn struct {
// See BEP 3 etc.
PeerID PeerID
PeerExtensionBytes pp.PeerExtensionBits
PeerListenPort int
// The actual Conn, used for closing, and setting socket options. Do not use methods on this
// while holding any mutexes.
@ -75,11 +78,25 @@ func (cn *PeerConn) pexStatus() string {
if !cn.supportsExtension(pp.ExtensionNamePex) {
return "unsupported"
}
return fmt.Sprintf(
"%v conns, %v unsent events",
len(cn.pex.remoteLiveConns),
cn.pex.numPending(),
)
if true {
return fmt.Sprintf(
"%v conns, %v unsent events",
len(cn.pex.remoteLiveConns),
cn.pex.numPending(),
)
} else {
// This alternative branch prints out the remote live conn addresses.
return fmt.Sprintf(
"%v conns, %v unsent events",
strings.Join(generics.SliceMap(
maps.Keys(cn.pex.remoteLiveConns),
func(from netip.AddrPort) string {
return from.String()
}), ","),
cn.pex.numPending(),
)
}
}
func (cn *PeerConn) peerImplStatusLines() []string {
@ -1049,6 +1066,8 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr {
dialAddr := *addr
dialAddr.Port = c.PeerListenPort
return &dialAddr
default:
panic(addr)
}
}
return c.RemoteAddr
@ -1082,6 +1101,11 @@ func (pc *PeerConn) remoteAddrPort() Option[netip.AddrPort] {
}).AddrPort())
}
func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
dialAddr := pc.dialAddr()
return addrPortFromPeerRemoteAddr(dialAddr)
}
func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
}

View File

@ -200,17 +200,17 @@ func TestConnPexEvent(t *testing.T) {
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
},
{
pexAdd,
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexAdd, dialTcpAddr, 0, nil},
},
{
pexDrop,
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
},
}

View File

@ -2002,8 +2002,9 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
}
t.conns[c] = struct{}{}
t.cl.event.Broadcast()
// We'll never receive the "p" extended handshake parameter.
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
t.pex.Add(c) // as no further extended handshake expected
t.pex.Add(c)
}
return nil
}
@ -2678,10 +2679,13 @@ func (t *Torrent) checkValidReceiveChunk(r Request) error {
return nil
}
func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*PeerConn) {
func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
for pc := range t.conns {
addr := pc.remoteAddrPort()
if !(addr.Ok && addr.Value == addrPort) {
dialAddr, err := pc.remoteDialAddrPort()
if err != nil {
continue
}
if dialAddr != target {
continue
}
ret = append(ret, pc)
@ -2725,7 +2729,18 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
case utHolepunch.Rendezvous:
t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
sendMsg := sendUtHolepunchMsg
targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort)
senderAddrPort, err := sender.remoteDialAddrPort()
if err != nil {
sender.logger.Levelf(
log.Warning,
"error getting ut_holepunch rendezvous sender's dial address: %v",
err,
)
// There's no better error code. The sender's address itself is invalid. I don't see
// this error message being appropriate anywhere else anyway.
sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
}
targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
if len(targets) == 0 {
sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
return nil
@ -2736,7 +2751,7 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
continue
}
sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0)
sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
}
return nil
case utHolepunch.Connect:

View File

@ -6,6 +6,8 @@ import (
"testing"
"testing/iotest"
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/internal/testutil"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
@ -23,8 +25,9 @@ func TestHolepunchConnect(t *testing.T) {
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
cfg.DisablePEX = true
//cfg.Debug = true
cfg.Debug = true
cfg.AcceptPeerConnections = false
//cfg.DisableUTP = true
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
@ -51,7 +54,7 @@ func TestHolepunchConnect(t *testing.T) {
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.Debug = true
//cfg.Debug = true
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
@ -85,9 +88,12 @@ func TestHolepunchConnect(t *testing.T) {
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
//time.Sleep(time.Second)
llg.cl.lock()
targetAddr := seeder.ListenAddrs()[1]
log.Printf("trying to initiate to %v", targetAddr)
llg.initiateConn(PeerInfo{
Addr: seeder.ListenAddrs()[0],
Addr: targetAddr,
}, true, false)
llg.cl.unlock()
wg.Wait()