Dial TCP with the listener's local addr

This commit is contained in:
Matt Joiner 2023-05-01 10:15:34 +10:00
parent b290350ba2
commit 599846546e
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
8 changed files with 162 additions and 13 deletions

View File

@ -16,7 +16,6 @@ import (
"net/netip"
"sort"
"strconv"
"strings"
"time"
"github.com/anacrolix/chansync"
@ -646,7 +645,7 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu
res = <-resCh
}
}()
// There are still incompleted dials.
// There are still uncompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
@ -663,8 +662,12 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu
func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
c, err := s.Dial(ctx, addr)
if err != nil {
log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
}
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
// it now in case we close the connection forthwith.
// it now in case we close the connection forthwith. Note this is also done in the TCP dialer
// code to increase the chance it's done.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
@ -672,10 +675,6 @@ func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
return c
}
func forgettableDialError(err error) bool {
return strings.Contains(err.Error(), "no suitable address found")
}
func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
if _, ok := t.halfOpen[addr]; !ok {
panic("invariant broken")

View File

@ -254,6 +254,50 @@ func TestResponsive(t *testing.T) {
assert.EqualValues(t, "d\n", string(b))
}
// TestResponsive was the first test to fail if uTP is disabled and TCP sockets dial from the
// listening port.
func TestResponsiveTcpOnly(t *testing.T) {
seederDataDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
cfg := TestingConfig(t)
cfg.DisableUTP = true
cfg.Seed = true
cfg.DataDir = seederDataDir
seeder, err := NewClient(cfg)
require.Nil(t, err)
defer seeder.Close()
seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
seederTorrent.VerifyData()
leecherDataDir := t.TempDir()
cfg = TestingConfig(t)
cfg.DataDir = leecherDataDir
leecher, err := NewClient(cfg)
require.Nil(t, err)
defer leecher.Close()
leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
leecherTorrent.AddClientPeer(seeder)
reader := leecherTorrent.NewReader()
defer reader.Close()
reader.SetReadahead(0)
reader.SetResponsive()
b := make([]byte, 2)
_, err = reader.Seek(3, io.SeekStart)
require.NoError(t, err)
_, err = io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
_, err = reader.Seek(11, io.SeekStart)
require.NoError(t, err)
n, err := io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, 2, n)
assert.EqualValues(t, "d\n", string(b))
}
func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
seederDataDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
@ -749,6 +793,7 @@ func TestClientAddressInUse(t *testing.T) {
defer s.Close()
}
cfg := TestingConfig(t).SetListenAddr(":50007")
cfg.DisableUTP = false
cl, err := NewClient(cfg)
require.Error(t, err)
require.Nil(t, cl)

3
go.mod
View File

@ -49,7 +49,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0
go.opentelemetry.io/otel/sdk v1.8.0
go.opentelemetry.io/otel/trace v1.8.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
)
@ -101,7 +101,6 @@ require (
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect

View File

@ -4,13 +4,12 @@ import (
"context"
"net"
"strconv"
"syscall"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/v2"
"github.com/pkg/errors"
"github.com/anacrolix/torrent/dialer"
)
type Listener interface {
@ -38,13 +37,54 @@ func listen(n network, addr string, f firewallCallback, logger log.Logger) (sock
}
}
var tcpListenConfig = net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) (err error) {
controlErr := c.Control(func(fd uintptr) {
err = setReusePortSockOpts(fd)
})
if err != nil {
return
}
err = controlErr
return
},
// BitTorrent connections manage their own keep-alives.
KeepAlive: -1,
}
func listenTcp(network, address string) (s socket, err error) {
l, err := net.Listen(network, address)
l, err := tcpListenConfig.Listen(context.Background(), network, address)
return tcpSocket{
Listener: l,
NetworkDialer: NetworkDialer{
Network: network,
Dialer: dialer.Default,
Dialer: &net.Dialer{
// My hope is that dialling out from a consistent port will improve the
// hole-punching behaviour. It might also prevent duplicate connections to the same
// peer if the peer does the same thing. There should probably be a fallback dialer
// if we fail to configure the socket to reuse ports for whatever reason.
LocalAddr: l.Addr(),
// We don't want fallback, as we explicitly manage the IPv4/IPv6 distinction
// ourselves, although it's probably not triggered as I think the network is already
// constrained to tcp4 or tcp6 at this point.
FallbackDelay: -1,
// BitTorrent connections manage their own keep-alives.
KeepAlive: tcpListenConfig.KeepAlive,
Control: func(network, address string, c syscall.RawConn) (err error) {
controlErr := c.Control(func(fd uintptr) {
err = setSockNoLinger(fd)
if err != nil {
// Failing to disable linger is undesirable, but not fatal.
log.Printf("error setting linger socket option on tcp socket: %v", err)
}
err = setReusePortSockOpts(fd)
})
if err == nil {
err = controlErr
}
return
},
},
},
}, err
}

10
sockopts.go Normal file
View File

@ -0,0 +1,10 @@
//go:build !wasm
package torrent
import "syscall"
var lingerOffVal = syscall.Linger{
Onoff: 0,
Linger: 0,
}

29
sockopts_unix.go Normal file
View File

@ -0,0 +1,29 @@
//go:build !windows && !wasm
package torrent
import (
"syscall"
"golang.org/x/sys/unix"
)
func setReusePortSockOpts(fd uintptr) (err error) {
// I would use libp2p/go-reuseport to do this here, but no surprise it's
// implemented incorrectly.
// Looks like we can get away with just REUSEPORT at least on Darwin, and probably by
// extension BSDs and Linux.
if false {
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return
}
}
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
return
}
func setSockNoLinger(fd uintptr) (err error) {
return syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
}

12
sockopts_wasm.go Normal file
View File

@ -0,0 +1,12 @@
package torrent
// It's possible that we either need to use JS-specific way to allow port reuse, or to fall back to
// dialling TCP without forcing the local address to match the listener. If the fallback is
// implemented, then this should probably return an error to trigger it.
func setReusePortSockOpts(fd uintptr) error {
return nil
}
func setSockNoLinger(fd uintptr) error {
return nil
}

15
sockopts_windows.go Normal file
View File

@ -0,0 +1,15 @@
package torrent
import (
"syscall"
"golang.org/x/sys/windows"
)
func setReusePortSockOpts(fd uintptr) (err error) {
return windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_REUSEADDR, 1)
}
func setSockNoLinger(fd uintptr) (err error) {
return syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal)
}