2
0
mirror of synced 2025-02-23 06:08:07 +00:00

Synchronize holepunch connect messages with existing rendezvous

This commit is contained in:
Matt Joiner 2023-05-09 15:45:50 +10:00
parent e86e624415
commit 06a1aa0769
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
11 changed files with 322 additions and 46 deletions

View File

@ -11,3 +11,9 @@
* [A South American paper on peer-selection strategies for uploading](https://arxiv.org/pdf/1402.2187.pdf)
Has some useful overviews of piece-selection.
### Hole-punching
Holepunching is tracked in Torrent, rather than in Client because if we send a rendezvous message, and subsequently receive a connect message, we do not know if a peer sent a rendezvous message to our relay and we're receiving the connect message for their rendezvous or ours. Relays are not required to respond to rendezvous, so we can't enforce a timeout. If we don't know if who sent the rendezvous that triggered a connect, then we don't know what infohash to use in the handshake. Once we send a rendezvous, and never receive a reply, we would have to always perform handshakes with our original infohash, or always copy the infohash the remote sends. Handling connects by always being the passive side in the handshake won't work since the other side might use the same behaviour and neither will initiate.
If we only perform rendezvous through relays for the same torrent as the relay, then all the handshake can be done actively for all connect messages. All connect messages received from a peer can only be for the same torrent for which we are connected to the peer.

View File

@ -19,8 +19,6 @@ import (
"strings"
"time"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
@ -45,6 +43,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
@ -715,8 +714,77 @@ func (cl *Client) initiateProtocolHandshakes(
return
}
func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error {
for {
switch {
case rz.gotConnect.IsSet():
return nil
case len(rz.relays) == 0:
return errors.New("all relays failed")
case ctx.Err() != nil:
return context.Cause(ctx)
}
relayCond := rz.relayCond.Signaled()
cl.unlock()
select {
case <-rz.gotConnect.Done():
case <-relayCond:
case <-ctx.Done():
}
cl.lock()
}
}
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
func (cl *Client) initiateRendezvousConnect(
t *Torrent, addr PeerRemoteAddr,
) (ok bool, err error) {
holepunchAddr, err := addrPortFromPeerRemoteAddr(addr)
if err != nil {
return
}
cl.lock()
defer cl.unlock()
rz, err := t.startHolepunchRendezvous(holepunchAddr)
if err != nil {
return
}
if rz == nil {
return
}
ok = true
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = cl.waitForRendezvousConnect(ctx, rz)
delete(t.utHolepunchRendezvous, holepunchAddr)
if err != nil {
err = fmt.Errorf("waiting for rendezvous connect signal: %w", err)
}
return
}
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) establishOutgoingConnEx(
opts outgoingConnOpts,
obfuscatedHeader bool,
) (
_ *PeerConn, err error,
) {
t := opts.t
addr := opts.addr
var rzOk bool
if !opts.skipHolepunchRendezvous {
rzOk, err = cl.initiateRendezvousConnect(t, addr)
if err != nil {
err = fmt.Errorf("initiating rendezvous connect: %w", err)
}
}
if opts.requireRendezvous && !rzOk {
return nil, err
}
if err != nil {
log.Print(err)
}
dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock()
defer cl.rUnlock()
@ -750,10 +818,10 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) {
torrent.Add("establish outgoing connection", 1)
obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst)
if err == nil {
torrent.Add("initiated conn with preferred header obfuscation", 1)
return
@ -765,7 +833,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *Pee
return
}
// Try again with encryption if we didn't earlier, or without if we did.
c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst)
if err == nil {
torrent.Add("initiated conn with fallback header obfuscation", 1)
}
@ -773,11 +841,20 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *Pee
return
}
type outgoingConnOpts struct {
t *Torrent
addr PeerRemoteAddr
// Don't attempt to connect unless a connect message is received after initiating a rendezvous.
requireRendezvous bool
// Don't send rendezvous requests to eligible relays.
skipHolepunchRendezvous bool
}
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
func (cl *Client) outgoingConnection(opts outgoingConnOpts, ps PeerSource, trusted bool) {
cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(t, addr)
c, err := cl.establishOutgoingConn(opts)
if err == nil {
c.conn.SetWriteDeadline(time.Time{})
}
@ -785,17 +862,17 @@ func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSou
defer cl.unlock()
// Don't release lock between here and addPeerConn, unless it's for
// failure.
cl.noLongerHalfOpen(t, addr.String())
cl.noLongerHalfOpen(opts.t, opts.addr.String())
if err != nil {
if cl.config.Debug {
cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", addr, err)
cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err)
}
return
}
defer c.close()
c.Discovery = ps
c.trusted = trusted
t.runHandshookConnLoggingErr(c)
opts.t.runHandshookConnLoggingErr(c)
}
// The port number for incoming peer connections. 0 if the client isn't listening.

2
go.mod
View File

@ -49,6 +49,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0
go.opentelemetry.io/otel/sdk v1.8.0
go.opentelemetry.io/otel/trace v1.8.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
)
@ -98,7 +99,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 // indirect
go.opentelemetry.io/proto/otlp v0.18.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.5.0 // indirect

2
go.sum
View File

@ -76,8 +76,6 @@ github.com/anacrolix/envpprof v1.2.1 h1:25TJe6t/i0AfzzldiGFKCpD+s+dk8lONBcacJZB2
github.com/anacrolix/envpprof v1.2.1/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4=
github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0=

View File

@ -0,0 +1,27 @@
package utHolepunch
type ErrCode uint32
var _ error = ErrCode(0)
const (
NoSuchPeer ErrCode = iota + 1
NotConnected
NoSupport
NoSelf
)
func (ec ErrCode) Error() string {
switch ec {
case NoSuchPeer:
return "target endpoint is invalid"
case NotConnected:
return "the relaying peer is not connected to the target peer"
case NoSupport:
return "the target peer does not support the holepunch extension"
case NoSelf:
return "the target endpoint belongs to the relaying peer"
default:
panic(ec)
}
}

View File

@ -17,7 +17,6 @@ type (
}
MsgType byte
AddrType byte
ErrCode uint32
)
const (
@ -31,13 +30,6 @@ const (
Ipv6 AddrType = iota
)
const (
NoSuchPeer ErrCode = iota + 1
NotConnected
NoSupport
NoSelf
)
func (m *Msg) UnmarshalBinary(b []byte) error {
if len(b) < 12 {
return fmt.Errorf("buffer too small to be valid")

View File

@ -14,8 +14,6 @@ import (
"strings"
"time"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
"github.com/RoaringBitmap/roaring"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
@ -28,6 +26,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
)
// Maintains the state of a BitTorrent-protocol based connection with a peer.
@ -891,6 +890,8 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err)
return
}
err = c.t.handleReceivedUtHolepunchMsg(msg, c)
return
default:
return fmt.Errorf("unexpected extended message ID: %v", id)
}

View File

@ -17,8 +17,6 @@ import (
"time"
"unsafe"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
@ -41,6 +39,7 @@ import (
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/storage"
@ -106,6 +105,8 @@ type Torrent struct {
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]PeerInfo
// The final ess is not silent here as it's in the plural.
utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
@ -1378,7 +1379,7 @@ func (t *Torrent) openNewConns() (initiated int) {
return
}
p := t.peers.PopMax()
t.initiateConn(p)
t.initiateConn(p, false, false)
initiated++
}
return
@ -2361,7 +2362,11 @@ func (t *Torrent) VerifyData() {
// Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
// sure all the PeerInfo fields are being used.
func (t *Torrent) initiateConn(peer PeerInfo) {
func (t *Torrent) initiateConn(
peer PeerInfo,
requireRendezvous bool,
skipHolepunchRendezvous bool,
) {
if peer.Id == t.cl.peerID {
return
}
@ -2374,7 +2379,12 @@ func (t *Torrent) initiateConn(peer PeerInfo) {
}
t.cl.numHalfOpen++
t.halfOpen[addr.String()] = peer
go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
go t.cl.outgoingConnection(outgoingConnOpts{
t: t,
addr: peer.Addr,
requireRendezvous: requireRendezvous,
skipHolepunchRendezvous: skipHolepunchRendezvous,
}, peer.Source, peer.Trusted)
}
// Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
@ -2701,21 +2711,24 @@ func makeUtHolepunchMsgForPeerConn(
}
}
func sendUtHolepunchMsg(
pc *PeerConn,
msgType utHolepunch.MsgType,
addrPort netip.AddrPort,
errCode utHolepunch.ErrCode,
) {
pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode))
}
func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
switch msg.MsgType {
case utHolepunch.Rendezvous:
sendMsg := func(
pc *PeerConn,
msgType utHolepunch.MsgType,
addrPort netip.AddrPort,
errCode utHolepunch.ErrCode,
) {
pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode))
}
log.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
sendMsg := sendUtHolepunchMsg
targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort)
if len(targets) == 0 {
sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
break
return nil
}
for _, pc := range targets {
if !pc.supportsExtension(utHolepunch.ExtensionName) {
@ -2725,14 +2738,60 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer
sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0)
}
return nil
case utHolepunch.Connect:
t.initiateConn(PeerInfo{
Addr: msg.AddrPort,
Source: PeerSourceUtHolepunch,
})
log.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort)
rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
if ok {
delete(rz.relays, sender)
rz.gotConnect.Set()
rz.relayCond.Broadcast()
} else {
// If the rendezvous was removed because we timed out or already got a connect signal,
// it doesn't hurt to try again.
t.initiateConn(PeerInfo{
Addr: msg.AddrPort,
Source: PeerSourceUtHolepunch,
}, false, true)
}
return nil
case utHolepunch.Error:
rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
if ok {
delete(rz.relays, sender)
rz.relayCond.Broadcast()
}
log.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
return nil
default:
return fmt.Errorf("unhandled msg type %v", msg.MsgType)
}
}
func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) {
if MapContains(t.utHolepunchRendezvous, addrPort) {
err = errors.New("rendezvous already exists")
return
}
g.InitNew(&rz)
for pc := range t.conns {
if !pc.supportsExtension(utHolepunch.ExtensionName) {
continue
}
if pc.supportsExtension(pp.ExtensionNamePex) {
if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
continue
}
}
sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
MakeMapIfNilAndSet(&rz.relays, pc, struct{}{})
}
if len(rz.relays) == 0 {
err = fmt.Errorf("no eligible relays")
return
}
if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) {
panic("expected to fail earlier if rendezvous already exists")
}
return
}

View File

@ -9,7 +9,7 @@ import (
"sync"
"testing"
"github.com/anacrolix/generics"
g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
@ -232,12 +232,12 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
gotMetainfoC: make(chan struct{}),
}
tt.setChunkSize(2)
generics.MakeMapIfNil(&tt.conns)
g.MakeMapIfNil(&tt.conns)
pc := PeerConn{}
pc.t = &tt
pc.peerImpl = &pc
pc.initRequestState()
generics.InitNew(&pc.callbacks)
g.InitNew(&pc.callbacks)
tt.conns[&pc] = struct{}{}
err = pc.peerSentHave(0)
c.Assert(err, qt.IsNil)

11
ut-holepunching.go Normal file
View File

@ -0,0 +1,11 @@
package torrent
import (
"github.com/anacrolix/chansync"
)
type utHolepunchRendezvous struct {
relays map[*PeerConn]struct{}
gotConnect chansync.SetOnce
relayCond chansync.BroadcastCond
}

105
ut-holepunching_test.go Normal file
View File

@ -0,0 +1,105 @@
package torrent
import (
"os"
"sync"
"testing"
"testing/iotest"
"github.com/anacrolix/torrent/internal/testutil"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Check that after completing leeching, a leecher transitions to a seeding
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
func TestHolepunchConnect(t *testing.T) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg := TestingConfig(t)
cfg.Seed = true
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
cfg.DisablePEX = true
//cfg.Debug = true
cfg.AcceptPeerConnections = false
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
defer testutil.ExportStatusWriter(seeder, "s", t)()
seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
require.NoError(t, err)
assert.True(t, ok)
seederTorrent.VerifyData()
cfg = TestingConfig(t)
cfg.Seed = true
cfg.DataDir = t.TempDir()
cfg.AlwaysWantConns = true
// This way the leecher leecher will still try to use this peer as a relay, but won't be told
// about the seeder via PEX.
//cfg.DisablePEX = true
//cfg.Debug = true
leecher, err := NewClient(cfg)
require.NoError(t, err)
defer leecher.Close()
defer testutil.ExportStatusWriter(leecher, "l", t)()
cfg = TestingConfig(t)
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.Debug = true
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
defer leecherLeecher.Close()
defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
_ = leecherGreeting
require.NoError(t, err)
assert.True(t, ok)
llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 3
return
}())
require.NoError(t, err)
assert.True(t, ok)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
r := llg.NewReader()
defer r.Close()
qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
}()
go seederTorrent.AddClientPeer(leecher)
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
llg.cl.lock()
llg.initiateConn(PeerInfo{
Addr: seeder.ListenAddrs()[0],
}, true, false)
llg.cl.unlock()
wg.Wait()
}
func waitForConns(t *Torrent) {
t.cl.lock()
defer t.cl.unlock()
for {
for range t.conns {
return
}
t.cl.event.Wait()
}
}