Attempt holepunch after initial dial fails

This commit is contained in:
Matt Joiner 2023-05-11 13:03:54 +10:00
parent c8fd8884b1
commit a11739a667
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
6 changed files with 223 additions and 255 deletions

269
client.go
View File

@ -19,8 +19,6 @@ import (
"strconv"
"time"
"github.com/anacrolix/torrent/internal/panicif"
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
@ -621,51 +619,14 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
// Returns a connection over UTP or TCP, whichever is first to connect.
func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
{
t := perf.NewTimer(perf.CallerName(0))
defer func() {
if res.Conn == nil {
t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
} else {
t.Mark("returned conn over " + res.Dialer.DialerNetwork())
}
}()
pool := dialPool{
addr: addr,
}
ctx, cancel := context.WithCancel(ctx)
// As soon as we return one connection, cancel the others.
defer cancel()
left := 0
resCh := make(chan DialResult, left)
defer pool.startDrainer()
for _, _s := range dialers {
left++
s := _s
go func() {
resCh <- DialResult{
dialFromSocket(ctx, s, addr),
s,
}
}()
pool.add(ctx, _s)
}
// Wait for a successful connection.
func() {
defer perf.ScopeTimer()()
for ; left > 0 && res.Conn == nil; left-- {
res = <-resCh
}
}()
// There are still uncompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
if conn != nil {
conn.Close()
}
}
}()
if res.Conn != nil {
go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
}
return res
return pool.getFirst()
}
func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
@ -732,119 +693,18 @@ func (cl *Client) initiateProtocolHandshakes(
return
}
func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error {
for {
switch {
case rz.gotConnect.IsSet():
return nil
case len(rz.relays) == 0:
return errors.New("all relays failed")
case ctx.Err() != nil:
return context.Cause(ctx)
}
relayCond := rz.relayCond.Signaled()
cl.unlock()
select {
case <-rz.gotConnect.Done():
case <-relayCond:
case <-ctx.Done():
}
cl.lock()
}
}
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) initiateRendezvousConnect(
t *Torrent, holepunchAddr netip.AddrPort,
) (ok bool, err error) {
cl.lock()
defer cl.unlock()
rz, err := t.startHolepunchRendezvous(holepunchAddr)
if err != nil {
return
}
if rz == nil {
return
}
ok = true
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = cl.waitForRendezvousConnect(ctx, rz)
delete(t.utHolepunchRendezvous, holepunchAddr)
if err != nil {
err = fmt.Errorf("waiting for rendezvous connect signal: %w", err)
}
return
}
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) establishOutgoingConnEx(
opts outgoingConnOpts,
func doProtocolHandshakeOnDialResult(
t *Torrent,
obfuscatedHeader bool,
addr PeerRemoteAddr,
dr DialResult,
) (
_ *PeerConn, err error,
c *PeerConn, err error,
) {
t := opts.t
addr := opts.addr
holepunchAddr, err := addrPortFromPeerRemoteAddr(addr)
var sentRendezvous bool
if err == nil {
if !opts.skipHolepunchRendezvous {
sentRendezvous, err = cl.initiateRendezvousConnect(t, holepunchAddr)
if err != nil {
err = fmt.Errorf("initiating rendezvous connect: %w", err)
}
}
}
gotHolepunchConnect := (err == nil && sentRendezvous) || opts.receivedHolepunchConnect
if opts.requireRendezvous && !sentRendezvous {
return nil, err
}
if err != nil {
t.logger.Print(err)
}
dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock()
defer cl.rUnlock()
return t.dialTimeout()
}())
defer cancel()
dr := cl.dialFirst(dialCtx, addr.String())
cl := t.cl
nc := dr.Conn
cl.lock()
if gotHolepunchConnect && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
g.MakeMapIfNilAndSet(
&cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
holepunchAddr,
struct{}{},
)
}
cl.unlock()
if nc == nil {
if !sentRendezvous && !gotHolepunchConnect {
cl.lock()
g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
cl.unlock()
}
if dialCtx.Err() != nil {
return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
}
return nil, errors.New("dial failed")
}
if gotHolepunchConnect {
panicif.False(holepunchAddr.IsValid())
cl.lock()
if g.MapContains(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, holepunchAddr) {
g.MakeMapIfNilAndSet(
&cl.dialableOnlyAfterHolepunch,
holepunchAddr,
struct{}{},
)
}
cl.unlock()
}
addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(
c, err = cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
newConnectionOpts{
outgoing: true,
@ -860,69 +720,128 @@ func (cl *Client) establishOutgoingConnEx(
return c, err
}
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) {
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
torrent.Add("establish outgoing connection", 1)
obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst)
addr := opts.peerInfo.Addr
dialPool := dialPool{
resCh: make(chan DialResult),
addr: addr.String(),
}
defer dialPool.startDrainer()
dialTimeout := opts.t.getDialTimeoutUnlocked()
{
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
for _, d := range cl.dialers {
dialPool.add(ctx, d)
}
}
holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
g.MakeMapIfNilAndSet(
&cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
holepunchAddr,
struct{}{},
)
}
headerObfuscationPolicy := opts.HeaderObfuscationPolicy
obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
firstDialResult := dialPool.getFirst()
if firstDialResult.Conn == nil {
// No dialers worked. Try to initiate a holepunching rendezvous.
if holepunchAddrErr == nil {
if !opts.receivedHolepunchConnect {
cl.lock()
g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
cl.unlock()
}
opts.t.startHolepunchRendezvous(holepunchAddr)
}
err = fmt.Errorf("all initial dials failed")
return
}
if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
}
c, err = doProtocolHandshakeOnDialResult(
opts.t,
obfuscatedHeaderFirst,
addr,
firstDialResult,
)
if err == nil {
torrent.Add("initiated conn with preferred header obfuscation", 1)
return
}
// cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
if cl.config.HeaderObfuscationPolicy.RequirePreferred {
// We should have just tried with the preferred header obfuscation. If it was required,
// there's nothing else to try.
// We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
if headerObfuscationPolicy.RequirePreferred {
return
}
// Try again with encryption if we didn't earlier, or without if we did.
c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst)
// Reuse the dialer that returned already but failed to handshake.
{
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
dialPool.add(ctx, firstDialResult.Dialer)
}
secondDialResult := dialPool.getFirst()
if secondDialResult.Conn == nil {
return
}
c, err = doProtocolHandshakeOnDialResult(
opts.t,
!obfuscatedHeaderFirst,
addr,
secondDialResult,
)
if err == nil {
torrent.Add("initiated conn with fallback header obfuscation", 1)
return
}
// cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
return
}
type outgoingConnOpts struct {
t *Torrent
addr PeerRemoteAddr
peerInfo PeerInfo
t *Torrent
// Don't attempt to connect unless a connect message is received after initiating a rendezvous.
requireRendezvous bool
// Don't send rendezvous requests to eligible relays.
skipHolepunchRendezvous bool
// Outgoing connection attempt is in response to holepunch connect message.
receivedHolepunchConnect bool
HeaderObfuscationPolicy HeaderObfuscationPolicy
}
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
func (cl *Client) outgoingConnection(
opts outgoingConnOpts,
ps PeerSource,
trusted bool,
attemptKey outgoingConnAttemptKey,
) {
cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(opts)
c, err := cl.dialAndCompleteHandshake(opts)
if err == nil {
c.conn.SetWriteDeadline(time.Time{})
}
cl.lock()
defer cl.unlock()
// Don't release lock between here and addPeerConn, unless it's for
// failure.
cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey)
// Don't release lock between here and addPeerConn, unless it's for failure.
cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
if err != nil {
if cl.config.Debug {
cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err)
cl.logger.Levelf(
log.Debug,
"error establishing outgoing connection to %v: %v",
opts.peerInfo.Addr,
err,
)
}
return
}
defer c.close()
c.Discovery = ps
c.trusted = trusted
c.Discovery = opts.peerInfo.Source
c.trusted = opts.peerInfo.Trusted
opts.t.runHandshookConnLoggingErr(c)
}

43
dial-pool.go Normal file
View File

@ -0,0 +1,43 @@
package torrent
import (
"context"
)
type dialPool struct {
resCh chan DialResult
addr string
left int
}
func (me *dialPool) getFirst() (res DialResult) {
for me.left > 0 && res.Conn == nil {
res = <-me.resCh
me.left--
}
return
}
func (me *dialPool) add(ctx context.Context, dialer Dialer) {
me.left++
go func() {
me.resCh <- DialResult{
dialFromSocket(ctx, dialer, me.addr),
dialer,
}
}()
}
func (me *dialPool) startDrainer() {
go me.drainAndCloseRemainingDials()
}
func (me *dialPool) drainAndCloseRemainingDials() {
for me.left > 0 {
conn := (<-me.resCh).Conn
me.left--
if conn != nil {
conn.Close()
}
}
}

View File

@ -7,10 +7,12 @@ import (
// http://www.bittorrent.org/beps/bep_0010.html
type (
ExtendedHandshakeMessage struct {
M map[ExtensionName]ExtensionNumber `bencode:"m"`
V string `bencode:"v,omitempty"`
Reqq int `bencode:"reqq,omitempty"`
Encryption bool `bencode:"e,omitempty"`
M map[ExtensionName]ExtensionNumber `bencode:"m"`
V string `bencode:"v,omitempty"`
Reqq int `bencode:"reqq,omitempty"`
// The only mention of this I can find is in https://www.bittorrent.org/beps/bep_0011.html
// for bit 0x01.
Encryption bool `bencode:"e"`
// BEP 9
MetadataSize int `bencode:"metadata_size,omitempty"`
// The local client port. It would be redundant for the receiving side of

View File

@ -106,8 +106,6 @@ type Torrent struct {
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
// The final ess is not silent here as it's in the plural.
utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
@ -1383,7 +1381,15 @@ func (t *Torrent) openNewConns() (initiated int) {
return
}
p := t.peers.PopMax()
t.initiateConn(p, false, false, false, false)
opts := outgoingConnOpts{
peerInfo: p,
t: t,
requireRendezvous: false,
skipHolepunchRendezvous: false,
receivedHolepunchConnect: false,
HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
}
initiateConn(opts, false)
initiated++
}
return
@ -2398,13 +2404,12 @@ func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
// Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
// sure all the PeerInfo fields are being used.
func (t *Torrent) initiateConn(
peer PeerInfo,
requireRendezvous bool,
skipHolepunchRendezvous bool,
func initiateConn(
opts outgoingConnOpts,
ignoreLimits bool,
receivedHolepunchConnect bool,
) {
t := opts.t
peer := opts.peerInfo
if peer.Id == t.cl.peerID {
return
}
@ -2424,15 +2429,7 @@ func (t *Torrent) initiateConn(
attemptKey := &peer
t.addHalfOpen(addrStr, attemptKey)
go t.cl.outgoingConnection(
outgoingConnOpts{
t: t,
addr: peer.Addr,
requireRendezvous: requireRendezvous,
skipHolepunchRendezvous: skipHolepunchRendezvous,
receivedHolepunchConnect: receivedHolepunchConnect,
},
peer.Source,
peer.Trusted,
opts,
attemptKey,
)
}
@ -2804,40 +2801,32 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
}
return nil
case utHolepunch.Connect:
t.logger.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort)
rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
if ok {
delete(rz.relays, sender)
rz.gotConnect.Set()
rz.relayCond.Broadcast()
} else {
// If the rendezvous was removed because we timed out or already got a connect signal,
// it doesn't hurt to try again.
t.initiateConn(PeerInfo{
Addr: msg.AddrPort,
Source: PeerSourceUtHolepunch,
}, false, true, true, true)
opts := outgoingConnOpts{
peerInfo: PeerInfo{
Addr: msg.AddrPort,
Source: PeerSourceUtHolepunch,
PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
},
t: t,
// Don't attempt to start our own rendezvous if we fail to connect.
skipHolepunchRendezvous: true,
receivedHolepunchConnect: true,
// Assume that the other end initiated the rendezvous, and will use our preferred
// encryption. So we will act normally.
HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
}
initiateConn(opts, true)
return nil
case utHolepunch.Error:
rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
if ok {
delete(rz.relays, sender)
rz.relayCond.Broadcast()
}
t.logger.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
return nil
default:
return fmt.Errorf("unhandled msg type %v", msg.MsgType)
}
}
func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) {
if MapContains(t.utHolepunchRendezvous, addrPort) {
err = errors.New("rendezvous already exists")
return
}
g.InitNew(&rz)
func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) error {
rzsSent := 0
for pc := range t.conns {
if !pc.supportsExtension(utHolepunch.ExtensionName) {
continue
@ -2847,17 +2836,14 @@ func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolep
continue
}
}
t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
MakeMapIfNilAndSet(&rz.relays, pc, struct{}{})
rzsSent++
}
if len(rz.relays) == 0 {
err = fmt.Errorf("no eligible relays")
return
if rzsSent == 0 {
return errors.New("no eligible relays")
}
if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) {
panic("expected to fail earlier if rendezvous already exists")
}
return
return nil
}
func (t *Torrent) numHalfOpenAttempts() (num int) {
@ -2866,3 +2852,18 @@ func (t *Torrent) numHalfOpenAttempts() (num int) {
}
return
}
func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
cl := t.cl
cl.rLock()
defer cl.rUnlock()
return t.dialTimeout()
}
func (t *Torrent) startHolepunchRendezvousForPeerRemoteAddr(addr PeerRemoteAddr) error {
addrPort, err := addrPortFromPeerRemoteAddr(addr)
if err != nil {
return err
}
return t.startHolepunchRendezvous(addrPort)
}

View File

@ -1,11 +1 @@
package torrent
import (
"github.com/anacrolix/chansync"
)
type utHolepunchRendezvous struct {
relays map[*PeerConn]struct{}
gotConnect chansync.SetOnce
relayCond chansync.BroadcastCond
}

View File

@ -54,7 +54,8 @@ func TestHolepunchConnect(t *testing.T) {
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
//cfg.Debug = true
cfg.Debug = true
cfg.NominalDialTimeout = time.Second
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
@ -88,26 +89,29 @@ func TestHolepunchConnect(t *testing.T) {
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
//time.Sleep(time.Second)
time.Sleep(time.Second)
llg.cl.lock()
targetAddr := seeder.ListenAddrs()[1]
targetAddr := seeder.ListenAddrs()[0]
log.Printf("trying to initiate to %v", targetAddr)
llg.initiateConn(PeerInfo{
Addr: targetAddr,
}, true, false, false, false)
initiateConn(outgoingConnOpts{
peerInfo: PeerInfo{
Addr: targetAddr,
},
t: llg,
requireRendezvous: true,
skipHolepunchRendezvous: false,
HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
}, true)
llg.cl.unlock()
wg.Wait()
// These checks would require that the leecher leecher first attempt to connect without
// holepunching.
//llClientStats := leecherLeecher.Stats()
//c := qt.New(t)
//c.Check(llClientStats.NumPeersDialedRequiringHolepunch, qt.Not(qt.Equals), 0)
//c.Check(
// llClientStats.NumPeersDialedRequiringHolepunch,
// qt.Equals,
// llClientStats.NumPeersUndiableWithoutHolepunch,
//)
llClientStats := leecherLeecher.Stats()
c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0)
c.Check(
llClientStats.NumPeersDialableOnlyAfterHolepunch,
qt.Equals,
llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect,
)
}
func waitForConns(t *Torrent) {
@ -120,3 +124,12 @@ func waitForConns(t *Torrent) {
t.cl.event.Wait()
}
}
func TestDialTcpNotAccepting(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
c := qt.New(t)
c.Check(err, qt.IsNil)
defer l.Close()
_, err = net.Dial("tcp", l.Addr().String())
c.Assert(err, qt.IsNotNil)
}