diff --git a/p2p/net/reuseport/dial.go b/p2p/net/reuseport/dial.go new file mode 100644 index 00000000..12a70b7e --- /dev/null +++ b/p2p/net/reuseport/dial.go @@ -0,0 +1,113 @@ +package tcpreuse + +import ( + "context" + "net" + + "github.com/libp2p/go-reuseport" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type dialer interface { + Dial(network, addr string) (net.Conn, error) + DialContext(ctx context.Context, network, addr string) (net.Conn, error) +} + +// Dial dials the given multiaddr, reusing ports we're currently listening on if +// possible. +// +// Dial attempts to be smart about choosing the source port. For example, If +// we're dialing a loopback address and we're listening on one or more loopback +// ports, Dial will randomly choose one of the loopback ports and addresses and +// reuse it. +func (t *Transport) Dial(raddr ma.Multiaddr) (manet.Conn, error) { + return t.DialContext(context.Background(), raddr) +} + +// DialContext is like Dial but takes a context. +func (t *Transport) DialContext(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) { + network, addr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + var d dialer + switch network { + case "tcp4": + d = t.v4.getDialer(network) + case "tcp6": + d = t.v6.getDialer(network) + default: + return nil, ErrWrongProto + } + conn, err := d.DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + maconn, err := manet.WrapNetConn(conn) + if err != nil { + conn.Close() + return nil, err + } + return maconn, nil +} + +func (n *network) getDialer(network string) dialer { + n.mu.RLock() + d := n.dialer + n.mu.RUnlock() + if d == nil { + n.mu.Lock() + defer n.mu.Unlock() + + if n.dialer == nil { + n.dialer = n.makeDialer(network) + } + d = n.dialer + } + return d +} + +func (n *network) makeDialer(network string) dialer { + if !reuseport.Available() { + log.Debug("reuseport not available") + return &net.Dialer{} + } + + var unspec net.IP + switch network { + case "tcp4": + unspec = net.IPv4zero + case "tcp6": + unspec = net.IPv6unspecified + default: + panic("invalid network: must be either tcp4 or tcp6") + } + + // How many ports are we listening on. + var port = 0 + for l := range n.listeners { + newPort := l.Addr().(*net.TCPAddr).Port + switch { + case newPort == 0: // Any port, ignore (really, we shouldn't get this case...). + case port == 0: // Haven't selected a port yet, choose this one. + port = newPort + case newPort == port: // Same as the selected port, continue... + default: // Multiple ports, use the multi dialer + return newMultiDialer(unspec, n.listeners) + } + } + + // None. + if port == 0 { + return &net.Dialer{} + } + + // One. Always dial from the single port we're listening on. + laddr := &net.TCPAddr{ + IP: unspec, + Port: port, + } + + return (*singleDialer)(laddr) +} diff --git a/p2p/net/reuseport/listen.go b/p2p/net/reuseport/listen.go new file mode 100644 index 00000000..8bb2c402 --- /dev/null +++ b/p2p/net/reuseport/listen.go @@ -0,0 +1,80 @@ +package tcpreuse + +import ( + "net" + + "github.com/libp2p/go-reuseport" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type listener struct { + manet.Listener + network *network +} + +func (l *listener) Close() error { + l.network.mu.Lock() + delete(l.network.listeners, l) + l.network.dialer = nil + l.network.mu.Unlock() + return l.Listener.Close() +} + +// Listen listens on the given multiaddr. +// +// If reuseport is supported, it will be enabled for this listener and future +// dials from this transport may reuse the port. +// +// Note: You can listen on the same multiaddr as many times as you want +// (although only *one* listener will end up handling the inbound connection). +func (t *Transport) Listen(laddr ma.Multiaddr) (manet.Listener, error) { + nw, naddr, err := manet.DialArgs(laddr) + if err != nil { + return nil, err + } + var n *network + switch nw { + case "tcp4": + n = &t.v4 + case "tcp6": + n = &t.v6 + default: + return nil, ErrWrongProto + } + + if !reuseport.Available() { + return manet.Listen(laddr) + } + nl, err := reuseport.Listen(nw, naddr) + if err != nil { + return manet.Listen(laddr) + } + + if _, ok := nl.Addr().(*net.TCPAddr); !ok { + nl.Close() + return nil, ErrWrongProto + } + + malist, err := manet.WrapNetListener(nl) + if err != nil { + nl.Close() + return nil, err + } + + list := &listener{ + Listener: malist, + network: n, + } + + n.mu.Lock() + defer n.mu.Unlock() + + if n.listeners == nil { + n.listeners = make(map[*listener]struct{}) + } + n.listeners[list] = struct{}{} + n.dialer = nil + + return list, nil +} diff --git a/p2p/net/reuseport/multidialer.go b/p2p/net/reuseport/multidialer.go new file mode 100644 index 00000000..edb200f8 --- /dev/null +++ b/p2p/net/reuseport/multidialer.go @@ -0,0 +1,90 @@ +package tcpreuse + +import ( + "context" + "fmt" + "math/rand" + "net" + + "github.com/libp2p/go-netroute" +) + +type multiDialer struct { + listeningAddresses []*net.TCPAddr + loopback []*net.TCPAddr + unspecified []*net.TCPAddr + fallback net.TCPAddr +} + +func (d *multiDialer) Dial(network, addr string) (net.Conn, error) { + return d.DialContext(context.Background(), network, addr) +} + +func randAddr(addrs []*net.TCPAddr) *net.TCPAddr { + if len(addrs) > 0 { + return addrs[rand.Intn(len(addrs))] + } + return nil +} + +// DialContext dials a target addr. +// Dialing preference is +// * If there is a listener on the local interface the OS expects to use to route towards addr, use that. +// * If there is a listener on a loopback address, addr is loopback, use that. +// * If there is a listener on an undefined address (0.0.0.0 or ::), use that. +// * Use the fallback IP specified during construction, with a port that's already being listened on, if one exists. +func (d *multiDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + tcpAddr, err := net.ResolveTCPAddr(network, addr) + if err != nil { + return nil, err + } + ip := tcpAddr.IP + if !ip.IsLoopback() && !ip.IsGlobalUnicast() { + return nil, fmt.Errorf("undialable IP: %s", ip) + } + + if router, err := netroute.New(); err == nil { + if _, _, preferredSrc, err := router.Route(ip); err == nil { + for _, optAddr := range d.listeningAddresses { + if optAddr.IP.Equal(preferredSrc) { + return reuseDial(ctx, optAddr, network, addr) + } + } + } + } + + if ip.IsLoopback() && len(d.loopback) > 0 { + return reuseDial(ctx, randAddr(d.loopback), network, addr) + } + if len(d.unspecified) == 0 { + return reuseDial(ctx, &d.fallback, network, addr) + } + + return reuseDial(ctx, randAddr(d.unspecified), network, addr) +} + +func newMultiDialer(unspec net.IP, listeners map[*listener]struct{}) (m dialer) { + addrs := make([]*net.TCPAddr, 0) + loopback := make([]*net.TCPAddr, 0) + unspecified := make([]*net.TCPAddr, 0) + existingPort := 0 + + for l := range listeners { + addr := l.Addr().(*net.TCPAddr) + addrs = append(addrs, addr) + if addr.IP.IsLoopback() { + loopback = append(loopback, addr) + } else if addr.IP.IsGlobalUnicast() && existingPort == 0 { + existingPort = addr.Port + } else if addr.IP.IsUnspecified() { + unspecified = append(unspecified, addr) + } + } + m = &multiDialer{ + listeningAddresses: addrs, + loopback: loopback, + unspecified: unspecified, + fallback: net.TCPAddr{IP: unspec, Port: existingPort}, + } + return +} diff --git a/p2p/net/reuseport/reuseport.go b/p2p/net/reuseport/reuseport.go new file mode 100644 index 00000000..4299ddc0 --- /dev/null +++ b/p2p/net/reuseport/reuseport.go @@ -0,0 +1,35 @@ +package tcpreuse + +import ( + "context" + "net" + + reuseport "github.com/libp2p/go-reuseport" +) + +var fallbackDialer net.Dialer + +// Dials using reuseport and then redials normally if that fails. +func reuseDial(ctx context.Context, laddr *net.TCPAddr, network, raddr string) (con net.Conn, err error) { + if laddr == nil { + return fallbackDialer.DialContext(ctx, network, raddr) + } + + d := net.Dialer{ + LocalAddr: laddr, + Control: reuseport.Control, + } + + con, err = d.DialContext(ctx, network, raddr) + if err == nil { + return con, nil + } + + if reuseErrShouldRetry(err) && ctx.Err() == nil { + // We could have an existing socket open or we could have one + // stuck in TIME-WAIT. + log.Debugf("failed to reuse port, will try again with a random port: %s", err) + con, err = fallbackDialer.DialContext(ctx, network, raddr) + } + return con, err +} diff --git a/p2p/net/reuseport/reuseport_plan9.go b/p2p/net/reuseport/reuseport_plan9.go new file mode 100644 index 00000000..9318652c --- /dev/null +++ b/p2p/net/reuseport/reuseport_plan9.go @@ -0,0 +1,44 @@ +package tcpreuse + +import ( + "net" + "os" +) + +const ( + EADDRINUSE = "address in use" + ECONNREFUSED = "connection refused" +) + +// reuseErrShouldRetry diagnoses whether to retry after a reuse error. +// if we failed to bind, we should retry. if bind worked and this is a +// real dial error (remote end didnt answer) then we should not retry. +func reuseErrShouldRetry(err error) bool { + if err == nil { + return false // hey, it worked! no need to retry. + } + + // if it's a network timeout error, it's a legitimate failure. + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return false + } + + e, ok := err.(*net.OpError) + if !ok { + return true + } + + e1, ok := e.Err.(*os.PathError) + if !ok { + return true + } + + switch e1.Err.Error() { + case EADDRINUSE: + return true + case ECONNREFUSED: + return false + default: + return true // optimistically default to retry. + } +} diff --git a/p2p/net/reuseport/reuseport_posix.go b/p2p/net/reuseport/reuseport_posix.go new file mode 100644 index 00000000..4f0d89c6 --- /dev/null +++ b/p2p/net/reuseport/reuseport_posix.go @@ -0,0 +1,37 @@ +//go:build !plan9 +// +build !plan9 + +package tcpreuse + +import ( + "net" + "syscall" +) + +// reuseErrShouldRetry diagnoses whether to retry after a reuse error. +// if we failed to bind, we should retry. if bind worked and this is a +// real dial error (remote end didnt answer) then we should not retry. +func reuseErrShouldRetry(err error) bool { + if err == nil { + return false // hey, it worked! no need to retry. + } + + // if it's a network timeout error, it's a legitimate failure. + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return false + } + + errno, ok := err.(syscall.Errno) + if !ok { // not an errno? who knows what this is. retry. + return true + } + + switch errno { + case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL: + return true // failure to bind. retry. + case syscall.ECONNREFUSED: + return false // real dial error + default: + return true // optimistically default to retry. + } +} diff --git a/p2p/net/reuseport/reuseport_test.go b/p2p/net/reuseport/reuseport_test.go new file mode 100644 index 00000000..b7a080d7 --- /dev/null +++ b/p2p/net/reuseport/reuseport_test.go @@ -0,0 +1,51 @@ +//go:build !plan9 +// +build !plan9 + +package tcpreuse + +import ( + "net" + "syscall" + "testing" +) + +type netTimeoutErr struct { + timeout bool +} + +func (e netTimeoutErr) Error() string { + return "" +} + +func (e netTimeoutErr) Timeout() bool { + return e.timeout +} + +func (e netTimeoutErr) Temporary() bool { + panic("not checked") +} + +func TestReuseError(t *testing.T) { + var nte1 net.Error = &netTimeoutErr{true} + var nte2 net.Error = &netTimeoutErr{false} + + cases := map[error]bool{ + nil: false, + syscall.EADDRINUSE: true, + syscall.EADDRNOTAVAIL: true, + syscall.ECONNREFUSED: false, + + nte1: false, + nte2: true, // this ones a little weird... we should check neterror.Temporary() too + + // test 'default' to true + syscall.EBUSY: true, + } + + for k, v := range cases { + if reuseErrShouldRetry(k) != v { + t.Fatalf("expected %t for %#v", v, k) + } + } + +} diff --git a/p2p/net/reuseport/singledialer.go b/p2p/net/reuseport/singledialer.go new file mode 100644 index 00000000..efb96eb1 --- /dev/null +++ b/p2p/net/reuseport/singledialer.go @@ -0,0 +1,16 @@ +package tcpreuse + +import ( + "context" + "net" +) + +type singleDialer net.TCPAddr + +func (d *singleDialer) Dial(network, address string) (net.Conn, error) { + return d.DialContext(context.Background(), network, address) +} + +func (d *singleDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + return reuseDial(ctx, (*net.TCPAddr)(d), network, address) +} diff --git a/p2p/net/reuseport/transport.go b/p2p/net/reuseport/transport.go new file mode 100644 index 00000000..a9a12c4c --- /dev/null +++ b/p2p/net/reuseport/transport.go @@ -0,0 +1,35 @@ +// Package tcpreuse provides a basic transport for automatically (and intelligently) reusing TCP ports. +// +// To use, construct a new Transport and configure listeners tr.Listen(...). +// When dialing (tr.Dial(...)), the transport will attempt to reuse the ports it's currently listening on, +// choosing the best one depending on the destination address. +// +// It is recommended to set set SO_LINGER to 0 for all connections, otherwise +// reusing the port may fail when re-dialing a recently closed connection. +// See https://hea-www.harvard.edu/~fine/Tech/addrinuse.html for details. +package tcpreuse + +import ( + "errors" + "sync" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("reuseport-transport") + +// ErrWrongProto is returned when dialing a protocol other than tcp. +var ErrWrongProto = errors.New("can only dial TCP over IPv4 or IPv6") + +// Transport is a TCP reuse transport that reuses listener ports. +// The zero value is safe to use. +type Transport struct { + v4 network + v6 network +} + +type network struct { + mu sync.RWMutex + listeners map[*listener]struct{} + dialer dialer +} diff --git a/p2p/net/reuseport/transport_test.go b/p2p/net/reuseport/transport_test.go new file mode 100644 index 00000000..19a1fe97 --- /dev/null +++ b/p2p/net/reuseport/transport_test.go @@ -0,0 +1,280 @@ +package tcpreuse + +import ( + "net" + "runtime" + "testing" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var loopbackV4, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") +var loopbackV6, _ = ma.NewMultiaddr("/ip6/::1/tcp/0") +var unspecV6, _ = ma.NewMultiaddr("/ip6/::/tcp/0") +var unspecV4, _ = ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0") + +var globalV4 ma.Multiaddr +var globalV6 ma.Multiaddr + +func init() { + addrs, err := manet.InterfaceMultiaddrs() + if err != nil { + return + } + for _, addr := range addrs { + if !manet.IsIP6LinkLocal(addr) && !manet.IsIPLoopback(addr) { + tcp, _ := ma.NewMultiaddr("/tcp/0") + switch addr.Protocols()[0].Code { + case ma.P_IP4: + if globalV4 == nil { + globalV4 = addr.Encapsulate(tcp) + } + case ma.P_IP6: + if globalV6 == nil { + globalV6 = addr.Encapsulate(tcp) + } + } + } + } +} + +func setLingerZero(c manet.Conn) { + if runtime.GOOS == "darwin" { + c.(interface{ SetLinger(int) error }).SetLinger(0) + } +} + +func acceptOne(t *testing.T, listener manet.Listener) <-chan manet.Conn { + t.Helper() + done := make(chan manet.Conn, 1) + go func() { + defer close(done) + c, err := listener.Accept() + if err != nil { + t.Error(err) + return + } + setLingerZero(c) + done <- c + }() + return done +} + +func dialOne(t *testing.T, tr *Transport, listener manet.Listener, expected ...int) int { + t.Helper() + + connChan := acceptOne(t, listener) + c, err := tr.Dial(listener.Multiaddr()) + if err != nil { + t.Fatal(err) + } + setLingerZero(c) + port := c.LocalAddr().(*net.TCPAddr).Port + serverConn := <-connChan + serverConn.Close() + c.Close() + if len(expected) == 0 { + return port + } + for _, p := range expected { + if p == port { + return port + } + } + t.Errorf("dialed %s from %v. expected to dial from port %v", listener.Multiaddr(), c.LocalAddr(), expected) + return 0 +} + +func TestNoneAndSingle(t *testing.T) { + var trA Transport + var trB Transport + listenerA, err := trA.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerA.Close() + + dialOne(t, &trB, listenerA) + + listenerB, err := trB.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerB.Close() + + dialOne(t, &trB, listenerA, listenerB.Addr().(*net.TCPAddr).Port) +} + +func TestTwoLocal(t *testing.T) { + var trA Transport + var trB Transport + listenerA, err := trA.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerA.Close() + + listenerB1, err := trB.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerB1.Close() + + listenerB2, err := trB.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerB2.Close() + + dialOne(t, &trB, listenerA, + listenerB1.Addr().(*net.TCPAddr).Port, + listenerB2.Addr().(*net.TCPAddr).Port) +} + +func TestGlobalPreferenceV4(t *testing.T) { + if globalV4 == nil { + t.Skip("no global IPv4 addresses configured") + return + } + t.Logf("when listening on %v, should prefer %v over %v", loopbackV4, loopbackV4, globalV4) + testPrefer(t, loopbackV4, loopbackV4, globalV4) + t.Logf("when listening on %v, should prefer %v over %v", loopbackV4, unspecV4, globalV4) + testPrefer(t, loopbackV4, unspecV4, globalV4) + + t.Logf("when listening on %v, should prefer %v over %v", globalV4, unspecV4, loopbackV4) + testPrefer(t, globalV4, unspecV4, loopbackV4) +} + +func TestGlobalPreferenceV6(t *testing.T) { + if globalV6 == nil { + t.Skip("no global IPv6 addresses configured") + return + } + testPrefer(t, loopbackV6, loopbackV6, globalV6) + testPrefer(t, loopbackV6, unspecV6, globalV6) + + testPrefer(t, globalV6, unspecV6, loopbackV6) +} + +func TestLoopbackPreference(t *testing.T) { + testPrefer(t, loopbackV4, loopbackV4, unspecV4) + testPrefer(t, loopbackV6, loopbackV6, unspecV6) +} + +func testPrefer(t *testing.T, listen, prefer, avoid ma.Multiaddr) { + var trA Transport + var trB Transport + listenerA, err := trA.Listen(listen) + if err != nil { + t.Fatal(err) + } + defer listenerA.Close() + + listenerB1, err := trB.Listen(avoid) + if err != nil { + t.Fatal(err) + } + defer listenerB1.Close() + + dialOne(t, &trB, listenerA, listenerB1.Addr().(*net.TCPAddr).Port) + + listenerB2, err := trB.Listen(prefer) + if err != nil { + t.Fatal(err) + } + defer listenerB2.Close() + + dialOne(t, &trB, listenerA, listenerB2.Addr().(*net.TCPAddr).Port) + + // Closing the listener should reset the dialer. + listenerB2.Close() + + dialOne(t, &trB, listenerA, listenerB1.Addr().(*net.TCPAddr).Port) +} + +func TestV6V4(t *testing.T) { + if runtime.GOOS == "darwin" { + t.Skip("This test is failing on OSX: https://github.com/libp2p/go-reuseport-transport/issues/40") + } + testUseFirst(t, loopbackV4, loopbackV4, loopbackV6) + testUseFirst(t, loopbackV6, loopbackV6, loopbackV4) +} + +func TestGlobalToGlobal(t *testing.T) { + if globalV4 == nil { + t.Skip("no globalV4 addresses configured") + return + } + testUseFirst(t, globalV4, globalV4, loopbackV4) + testUseFirst(t, globalV6, globalV6, loopbackV6) +} + +func testUseFirst(t *testing.T, listen, use, never ma.Multiaddr) { + var trA Transport + var trB Transport + listenerA, err := trA.Listen(globalV4) + if err != nil { + t.Fatal(err) + } + defer listenerA.Close() + + listenerB1, err := trB.Listen(loopbackV4) + if err != nil { + t.Fatal(err) + } + defer listenerB1.Close() + + // It works (random port) + dialOne(t, &trB, listenerA) + + listenerB2, err := trB.Listen(globalV4) + if err != nil { + t.Fatal(err) + } + defer listenerB2.Close() + + // Uses globalV4 port. + dialOne(t, &trB, listenerA, listenerB2.Addr().(*net.TCPAddr).Port) + + // Closing the listener should reset the dialer. + listenerB2.Close() + + // It still works. + dialOne(t, &trB, listenerA) +} + +func TestDuplicateGlobal(t *testing.T) { + if globalV4 == nil { + t.Skip("no globalV4 addresses configured") + return + } + + var trA Transport + var trB Transport + listenerA, err := trA.Listen(globalV4) + if err != nil { + t.Fatal(err) + } + defer listenerA.Close() + + listenerB1, err := trB.Listen(globalV4) + if err != nil { + t.Fatal(err) + } + defer listenerB1.Close() + + listenerB2, err := trB.Listen(globalV4) + if err != nil { + t.Fatal(err) + } + defer listenerB2.Close() + + // Check which port we're using + port := dialOne(t, &trB, listenerA) + + // Check consistency + for i := 0; i < 10; i++ { + dialOne(t, &trB, listenerA, port) + } +}