diff --git a/client.go b/client.go index 4fac2722..342025ac 100644 --- a/client.go +++ b/client.go @@ -16,7 +16,6 @@ import ( "net/netip" "sort" "strconv" - "strings" "time" "github.com/anacrolix/chansync" @@ -646,7 +645,7 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu res = <-resCh } }() - // There are still incompleted dials. + // There are still uncompleted dials. go func() { for ; left > 0; left-- { conn := (<-resCh).Conn @@ -663,8 +662,12 @@ func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResu func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { c, err := s.Dial(ctx, addr) + if err != nil { + log.Levelf(log.Debug, "error dialing %q: %v", addr, err) + } // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set - // it now in case we close the connection forthwith. + // it now in case we close the connection forthwith. Note this is also done in the TCP dialer + // code to increase the chance it's done. if tc, ok := c.(*net.TCPConn); ok { tc.SetLinger(0) } @@ -672,10 +675,6 @@ func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { return c } -func forgettableDialError(err error) bool { - return strings.Contains(err.Error(), "no suitable address found") -} - func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { if _, ok := t.halfOpen[addr]; !ok { panic("invariant broken") diff --git a/client_test.go b/client_test.go index 03851ae8..c8624395 100644 --- a/client_test.go +++ b/client_test.go @@ -254,6 +254,50 @@ func TestResponsive(t *testing.T) { assert.EqualValues(t, "d\n", string(b)) } +// TestResponsive was the first test to fail if uTP is disabled and TCP sockets dial from the +// listening port. +func TestResponsiveTcpOnly(t *testing.T) { + seederDataDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(seederDataDir) + cfg := TestingConfig(t) + cfg.DisableUTP = true + cfg.Seed = true + cfg.DataDir = seederDataDir + seeder, err := NewClient(cfg) + require.Nil(t, err) + defer seeder.Close() + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + seederTorrent.VerifyData() + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + cfg.DataDir = leecherDataDir + leecher, err := NewClient(cfg) + require.Nil(t, err) + defer leecher.Close() + leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + return + }()) + leecherTorrent.AddClientPeer(seeder) + reader := leecherTorrent.NewReader() + defer reader.Close() + reader.SetReadahead(0) + reader.SetResponsive() + b := make([]byte, 2) + _, err = reader.Seek(3, io.SeekStart) + require.NoError(t, err) + _, err = io.ReadFull(reader, b) + assert.Nil(t, err) + assert.EqualValues(t, "lo", string(b)) + _, err = reader.Seek(11, io.SeekStart) + require.NoError(t, err) + n, err := io.ReadFull(reader, b) + assert.Nil(t, err) + assert.EqualValues(t, 2, n) + assert.EqualValues(t, "d\n", string(b)) +} + func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { seederDataDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) @@ -749,6 +793,7 @@ func TestClientAddressInUse(t *testing.T) { defer s.Close() } cfg := TestingConfig(t).SetListenAddr(":50007") + cfg.DisableUTP = false cl, err := NewClient(cfg) require.Error(t, err) require.Nil(t, cl) diff --git a/go.mod b/go.mod index 9ea01b96..d7bcf96f 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0 go.opentelemetry.io/otel/sdk v1.8.0 go.opentelemetry.io/otel/trace v1.8.0 - golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d + golang.org/x/sys v0.5.0 golang.org/x/time v0.0.0-20220609170525-579cf78fd858 ) @@ -101,7 +101,6 @@ require ( golang.org/x/crypto v0.5.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect - golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect diff --git a/socket.go b/socket.go index 127cd29a..22809e17 100644 --- a/socket.go +++ b/socket.go @@ -4,13 +4,12 @@ import ( "context" "net" "strconv" + "syscall" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/v2" "github.com/pkg/errors" - - "github.com/anacrolix/torrent/dialer" ) type Listener interface { @@ -38,13 +37,54 @@ func listen(n network, addr string, f firewallCallback, logger log.Logger) (sock } } +var tcpListenConfig = net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) (err error) { + controlErr := c.Control(func(fd uintptr) { + err = setReusePortSockOpts(fd) + }) + if err != nil { + return + } + err = controlErr + return + }, + // BitTorrent connections manage their own keep-alives. + KeepAlive: -1, +} + func listenTcp(network, address string) (s socket, err error) { - l, err := net.Listen(network, address) + l, err := tcpListenConfig.Listen(context.Background(), network, address) return tcpSocket{ Listener: l, NetworkDialer: NetworkDialer{ Network: network, - Dialer: dialer.Default, + Dialer: &net.Dialer{ + // My hope is that dialling out from a consistent port will improve the + // hole-punching behaviour. It might also prevent duplicate connections to the same + // peer if the peer does the same thing. There should probably be a fallback dialer + // if we fail to configure the socket to reuse ports for whatever reason. + LocalAddr: l.Addr(), + // We don't want fallback, as we explicitly manage the IPv4/IPv6 distinction + // ourselves, although it's probably not triggered as I think the network is already + // constrained to tcp4 or tcp6 at this point. + FallbackDelay: -1, + // BitTorrent connections manage their own keep-alives. + KeepAlive: tcpListenConfig.KeepAlive, + Control: func(network, address string, c syscall.RawConn) (err error) { + controlErr := c.Control(func(fd uintptr) { + err = setSockNoLinger(fd) + if err != nil { + // Failing to disable linger is undesirable, but not fatal. + log.Printf("error setting linger socket option on tcp socket: %v", err) + } + err = setReusePortSockOpts(fd) + }) + if err == nil { + err = controlErr + } + return + }, + }, }, }, err } diff --git a/sockopts.go b/sockopts.go new file mode 100644 index 00000000..54f307d1 --- /dev/null +++ b/sockopts.go @@ -0,0 +1,10 @@ +//go:build !wasm + +package torrent + +import "syscall" + +var lingerOffVal = syscall.Linger{ + Onoff: 0, + Linger: 0, +} diff --git a/sockopts_unix.go b/sockopts_unix.go new file mode 100644 index 00000000..52ec9e8d --- /dev/null +++ b/sockopts_unix.go @@ -0,0 +1,29 @@ +//go:build !windows && !wasm + +package torrent + +import ( + "syscall" + + "golang.org/x/sys/unix" +) + +func setReusePortSockOpts(fd uintptr) (err error) { + // I would use libp2p/go-reuseport to do this here, but no surprise it's + // implemented incorrectly. + + // Looks like we can get away with just REUSEPORT at least on Darwin, and probably by + // extension BSDs and Linux. + if false { + err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) + if err != nil { + return + } + } + err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1) + return +} + +func setSockNoLinger(fd uintptr) (err error) { + return syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal) +} diff --git a/sockopts_wasm.go b/sockopts_wasm.go new file mode 100644 index 00000000..9705b914 --- /dev/null +++ b/sockopts_wasm.go @@ -0,0 +1,12 @@ +package torrent + +// It's possible that we either need to use JS-specific way to allow port reuse, or to fall back to +// dialling TCP without forcing the local address to match the listener. If the fallback is +// implemented, then this should probably return an error to trigger it. +func setReusePortSockOpts(fd uintptr) error { + return nil +} + +func setSockNoLinger(fd uintptr) error { + return nil +} diff --git a/sockopts_windows.go b/sockopts_windows.go new file mode 100644 index 00000000..c3c0ab04 --- /dev/null +++ b/sockopts_windows.go @@ -0,0 +1,15 @@ +package torrent + +import ( + "syscall" + + "golang.org/x/sys/windows" +) + +func setReusePortSockOpts(fd uintptr) (err error) { + return windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_REUSEADDR, 1) +} + +func setSockNoLinger(fd uintptr) (err error) { + return syscall.SetsockoptLinger(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &lingerOffVal) +}