diff --git a/peer.go b/peer.go index 4e2a24f6..1d8ead10 100644 --- a/peer.go +++ b/peer.go @@ -86,7 +86,6 @@ type ( peerChoking bool peerRequests map[Request]*peerRequestState PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake - PeerListenPort int // The highest possible number of pieces the torrent could have based on // communication with the peer. Generally only useful until we have the // torrent info. diff --git a/peerconn.go b/peerconn.go index 71a38925..d85d0d51 100644 --- a/peerconn.go +++ b/peerconn.go @@ -15,10 +15,12 @@ import ( "time" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/generics" . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" + "golang.org/x/exp/maps" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -40,6 +42,7 @@ type PeerConn struct { // See BEP 3 etc. PeerID PeerID PeerExtensionBytes pp.PeerExtensionBits + PeerListenPort int // The actual Conn, used for closing, and setting socket options. Do not use methods on this // while holding any mutexes. @@ -75,11 +78,25 @@ func (cn *PeerConn) pexStatus() string { if !cn.supportsExtension(pp.ExtensionNamePex) { return "unsupported" } - return fmt.Sprintf( - "%v conns, %v unsent events", - len(cn.pex.remoteLiveConns), - cn.pex.numPending(), - ) + if true { + return fmt.Sprintf( + "%v conns, %v unsent events", + len(cn.pex.remoteLiveConns), + cn.pex.numPending(), + ) + } else { + // This alternative branch prints out the remote live conn addresses. + return fmt.Sprintf( + "%v conns, %v unsent events", + strings.Join(generics.SliceMap( + maps.Keys(cn.pex.remoteLiveConns), + func(from netip.AddrPort) string { + return from.String() + }), ","), + cn.pex.numPending(), + ) + + } } func (cn *PeerConn) peerImplStatusLines() []string { @@ -1049,6 +1066,8 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr { dialAddr := *addr dialAddr.Port = c.PeerListenPort return &dialAddr + default: + panic(addr) } } return c.RemoteAddr @@ -1082,6 +1101,11 @@ func (pc *PeerConn) remoteAddrPort() Option[netip.AddrPort] { }).AddrPort()) } +func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) { + dialAddr := pc.dialAddr() + return addrPortFromPeerRemoteAddr(dialAddr) +} + func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool { return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit) } diff --git a/peerconn_test.go b/peerconn_test.go index c68ca6f8..f0a5a6fd 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -200,17 +200,17 @@ func TestConnPexEvent(t *testing.T) { }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}}, + &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port}, pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil}, }, { pexAdd, - &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}}, + &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port}, pexEvent{pexAdd, dialTcpAddr, 0, nil}, }, { pexDrop, - &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}}, + &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port}, pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil}, }, } diff --git a/torrent.go b/torrent.go index eef9c0cb..c7ca2549 100644 --- a/torrent.go +++ b/torrent.go @@ -2002,8 +2002,9 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } t.conns[c] = struct{}{} t.cl.event.Broadcast() + // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { - t.pex.Add(c) // as no further extended handshake expected + t.pex.Add(c) } return nil } @@ -2678,10 +2679,13 @@ func (t *Torrent) checkValidReceiveChunk(r Request) error { return nil } -func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*PeerConn) { +func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) { for pc := range t.conns { - addr := pc.remoteAddrPort() - if !(addr.Ok && addr.Value == addrPort) { + dialAddr, err := pc.remoteDialAddrPort() + if err != nil { + continue + } + if dialAddr != target { continue } ret = append(ret, pc) @@ -2725,7 +2729,18 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer case utHolepunch.Rendezvous: t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) sendMsg := sendUtHolepunchMsg - targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort) + senderAddrPort, err := sender.remoteDialAddrPort() + if err != nil { + sender.logger.Levelf( + log.Warning, + "error getting ut_holepunch rendezvous sender's dial address: %v", + err, + ) + // There's no better error code. The sender's address itself is invalid. I don't see + // this error message being appropriate anywhere else anyway. + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer) + } + targets := t.peerConnsWithDialAddrPort(msg.AddrPort) if len(targets) == 0 { sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) return nil @@ -2736,7 +2751,7 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer continue } sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) - sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0) + sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0) } return nil case utHolepunch.Connect: diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go index 1b4b7d08..fe73e8d1 100644 --- a/ut-holepunching_test.go +++ b/ut-holepunching_test.go @@ -6,6 +6,8 @@ import ( "testing" "testing/iotest" + "github.com/anacrolix/log" + "github.com/anacrolix/torrent/internal/testutil" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" @@ -23,8 +25,9 @@ func TestHolepunchConnect(t *testing.T) { cfg.MaxAllocPeerRequestDataPerConn = 4 cfg.DataDir = greetingTempDir cfg.DisablePEX = true - //cfg.Debug = true + cfg.Debug = true cfg.AcceptPeerConnections = false + //cfg.DisableUTP = true seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() @@ -51,7 +54,7 @@ func TestHolepunchConnect(t *testing.T) { cfg.Seed = false cfg.DataDir = t.TempDir() cfg.MaxAllocPeerRequestDataPerConn = 4 - cfg.Debug = true + //cfg.Debug = true //cfg.DisableUTP = true leecherLeecher, _ := NewClient(cfg) require.NoError(t, err) @@ -85,9 +88,12 @@ func TestHolepunchConnect(t *testing.T) { waitForConns(seederTorrent) go llg.AddClientPeer(leecher) waitForConns(llg) + //time.Sleep(time.Second) llg.cl.lock() + targetAddr := seeder.ListenAddrs()[1] + log.Printf("trying to initiate to %v", targetAddr) llg.initiateConn(PeerInfo{ - Addr: seeder.ListenAddrs()[0], + Addr: targetAddr, }, true, false) llg.cl.unlock() wg.Wait()