diff --git a/bep40_test.go b/bep40_test.go index 78b3ac21..48d5fdda 100644 --- a/bep40_test.go +++ b/bep40_test.go @@ -20,6 +20,10 @@ func TestBep40Priority(t *testing.T) { IpPort{IP: net.ParseIP("123.213.32.10"), Port: 0}, IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, )) + assert.Equal(t, peerPriority(0x2b41d456), bep40PriorityIgnoreError( + IpPort{IP: net.ParseIP("206.248.98.111"), Port: 0}, + IpPort{IP: net.ParseIP("142.147.89.224"), Port: 0}, + )) assert.EqualValues(t, "\x00\x00\x00\x00", func() []byte { b, _ := bep40PriorityBytes( IpPort{IP: net.ParseIP("123.213.32.234"), Port: 0}, diff --git a/client.go b/client.go index 70d3d103..dd03f338 100644 --- a/client.go +++ b/client.go @@ -551,8 +551,16 @@ func (cl *Client) incomingConnection(nc net.Conn) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(), - regularNetConnPeerConnConnString(nc)) + remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr()) + c := cl.newConnection( + nc, + newConnectionOpts{ + outgoing: false, + remoteAddr: nc.RemoteAddr(), + localPublicAddr: cl.publicAddr(remoteAddr.IP), + network: nc.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(nc), + }) defer func() { cl.lock() defer cl.unlock() @@ -687,13 +695,12 @@ func (cl *Client) initiateProtocolHandshakes( ctx context.Context, nc net.Conn, t *Torrent, - outgoing, encryptHeader bool, - remoteAddr PeerRemoteAddr, - network, connString string, + encryptHeader bool, + newConnOpts newConnectionOpts, ) ( c *PeerConn, err error, ) { - c = cl.newConnection(nc, outgoing, remoteAddr, network, connString) + c = cl.newConnection(nc, newConnOpts) c.headerEncrypted = encryptHeader ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() @@ -725,7 +732,15 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus } return nil, errors.New("dial failed") } - c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc)) + addrIpPort, _ := tryIpPortFromNetAddr(addr) + c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ + outgoing: true, + remoteAddr: addr, + // It would be possible to retrieve a public IP from the dialer used here? + localPublicAddr: cl.publicAddr(addrIpPort.IP), + network: dr.Dialer.DialerNetwork(), + connString: regularNetConnPeerConnConnString(nc), + }) if err != nil { nc.Close() } @@ -1468,28 +1483,37 @@ func (cl *Client) banPeerIP(ip net.IP) { } } -func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) { - if network == "" { - panic(remoteAddr) +type newConnectionOpts struct { + outgoing bool + remoteAddr PeerRemoteAddr + localPublicAddr peerLocalPublicAddr + network string + connString string +} + +func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { + if opts.network == "" { + panic(opts.remoteAddr) } c = &PeerConn{ Peer: Peer{ - outgoing: outgoing, + outgoing: opts.outgoing, choking: true, peerChoking: true, PeerMaxRequests: 250, - RemoteAddr: remoteAddr, - Network: network, - callbacks: &cl.config.Callbacks, + RemoteAddr: opts.remoteAddr, + localPublicAddr: opts.localPublicAddr, + Network: opts.network, + callbacks: &cl.config.Callbacks, }, - connString: connString, + connString: opts.connString, conn: nc, } c.initRequestState() // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. - if remoteAddr != nil { - netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String()) + if opts.remoteAddr != nil { + netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String()) if err == nil { c.bannableAddr = Some(netipAddrPort.Addr()) } @@ -1501,7 +1525,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot l: cl.config.DownloadRateLimiter, r: c.r, } - c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) + c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) } diff --git a/peer.go b/peer.go index e1bab184..3b8d4586 100644 --- a/peer.go +++ b/peer.go @@ -1,5 +1,7 @@ package torrent +type peerLocalPublicAddr = IpPort + func (p *Peer) isLowOnRequests() bool { return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() } diff --git a/peerconn.go b/peerconn.go index bd6b376b..77857d84 100644 --- a/peerconn.go +++ b/peerconn.go @@ -64,10 +64,13 @@ type Peer struct { peerImpl callbacks *Callbacks - outgoing bool - Network string - RemoteAddr PeerRemoteAddr - bannableAddr Option[bannableAddr] + outgoing bool + Network string + RemoteAddr PeerRemoteAddr + // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the + // config. + localPublicAddr peerLocalPublicAddr + bannableAddr Option[bannableAddr] // True if the connection is operating over MSE obfuscation. headerEncrypted bool cryptoMethod mse.CryptoMethod @@ -1729,7 +1732,7 @@ func (c *PeerConn) setTorrent(t *Torrent) { } func (c *Peer) peerPriority() (peerPriority, error) { - return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) + return bep40Priority(c.remoteIpPort(), c.localPublicAddr) } func (c *Peer) remoteIp() net.IP { diff --git a/peerconn_test.go b/peerconn_test.go index 2f1fc56a..23d32286 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -25,7 +25,7 @@ func TestSendBitfieldThenHave(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() - c := cl.newConnection(nil, false, nil, "io.Pipe", "") + c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { t.Log(err) @@ -107,7 +107,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { t.onSetInfo() t._pendingPieces.Add(0) r, w := net.Pipe() - cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) + cn := cl.newConnection(r, newConnectionOpts{ + outgoing: true, + remoteAddr: r.RemoteAddr(), + network: r.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(r), + }) cn.setTorrent(t) mrlErrChan := make(chan error) msg := pp.Message{ @@ -287,7 +292,7 @@ func TestPreferredNetworkDirection(t *testing.T) { func TestReceiveLargeRequest(t *testing.T) { c := qt.New(t) cl := newTestingClient(t) - pc := cl.newConnection(nil, false, nil, "test", "") + pc := cl.newConnection(nil, newConnectionOpts{network: "test"}) tor := cl.newTorrentForTesting() tor.info = &metainfo.Info{PieceLength: 3 << 20} pc.setTorrent(tor) diff --git a/pexconn_test.go b/pexconn_test.go index 42c62cff..603398c5 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -17,7 +17,10 @@ func TestPexConnState(t *testing.T) { cl.initLogger() torrent := cl.newTorrent(metainfo.Hash{}, nil) addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} - c := cl.newConnection(nil, false, addr, addr.Network(), "") + c := cl.newConnection(nil, newConnectionOpts{ + remoteAddr: addr, + network: addr.Network(), + }) c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId c.messageWriter.mu.Lock() diff --git a/torrent.go b/torrent.go index f18fc252..157b44d4 100644 --- a/torrent.go +++ b/torrent.go @@ -1575,18 +1575,23 @@ func (t *Torrent) onWebRtcConn( DataChannelContext: dcc, } peerRemoteAddr := netConn.RemoteAddr() + //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr) if t.cl.badPeerAddr(peerRemoteAddr) { return } + localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, t, - dcc.LocalOffered, false, - netConn.RemoteAddr(), - webrtcNetwork, - fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + newConnectionOpts{ + outgoing: dcc.LocalOffered, + remoteAddr: peerRemoteAddr, + localPublicAddr: localAddrIpPort, + network: webrtcNetwork, + connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + }, ) if err != nil { t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)