mirror of https://github.com/status-im/consul.git
connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners (#10486)
Fixes #10480 Also fixed a data race in the `connect/proxy` package that was unearthed by the tests changed for this bugfix.
This commit is contained in:
parent
b7c4ec8cfd
commit
96f6ec48fa
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect/proxy: fixes logic bug preventing builtin/native proxy from starting upstream listeners
|
||||
```
|
|
@ -45,7 +45,10 @@ type Listener struct {
|
|||
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and
|
||||
// this is cheap and correct.
|
||||
listeningChan chan struct{}
|
||||
listener net.Listener
|
||||
|
||||
// listenerLock guards access to the listener field
|
||||
listenerLock sync.Mutex
|
||||
listener net.Listener
|
||||
|
||||
logger hclog.Logger
|
||||
|
||||
|
@ -138,15 +141,17 @@ func (l *Listener) Serve() error {
|
|||
return errors.New("serve called on a closed listener")
|
||||
}
|
||||
|
||||
var err error
|
||||
l.listener, err = l.listenFunc()
|
||||
listener, err := l.listenFunc()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.setListener(listener)
|
||||
|
||||
close(l.listeningChan)
|
||||
|
||||
for {
|
||||
conn, err := l.listener.Accept()
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
if atomic.LoadInt32(&l.stopFlag) == 1 {
|
||||
return nil
|
||||
|
@ -242,16 +247,23 @@ func (l *Listener) trackConn() func() {
|
|||
|
||||
// Close terminates the listener and all active connections.
|
||||
func (l *Listener) Close() error {
|
||||
// Prevent the listener from being started.
|
||||
oldFlag := atomic.SwapInt32(&l.stopFlag, 1)
|
||||
if oldFlag == 0 {
|
||||
close(l.stopChan)
|
||||
// Wait for all conns to close
|
||||
l.connWG.Wait()
|
||||
|
||||
if l.listener != nil {
|
||||
l.listener.Close()
|
||||
}
|
||||
if oldFlag != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the current listener and stop accepting new requests.
|
||||
if listener := l.getListener(); listener != nil {
|
||||
listener.Close()
|
||||
}
|
||||
|
||||
// Stop outstanding requests.
|
||||
close(l.stopChan)
|
||||
|
||||
// Wait for all conns to close
|
||||
l.connWG.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -264,3 +276,15 @@ func (l *Listener) Wait() {
|
|||
func (l *Listener) BindAddr() string {
|
||||
return l.bindAddr
|
||||
}
|
||||
|
||||
func (l *Listener) setListener(listener net.Listener) {
|
||||
l.listenerLock.Lock()
|
||||
l.listener = listener
|
||||
l.listenerLock.Unlock()
|
||||
}
|
||||
|
||||
func (l *Listener) getListener() net.Listener {
|
||||
l.listenerLock.Lock()
|
||||
defer l.listenerLock.Unlock()
|
||||
return l.listener
|
||||
}
|
||||
|
|
|
@ -3,10 +3,11 @@ package proxy
|
|||
import (
|
||||
"crypto/x509"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/connect"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// Proxy implements the built-in connect proxy.
|
||||
|
@ -103,8 +104,14 @@ func (p *Proxy) Serve() error {
|
|||
for _, uc := range newCfg.Upstreams {
|
||||
uc.applyDefaults()
|
||||
|
||||
if uc.LocalBindPort < 1 || uc.LocalBindSocketPath == "" {
|
||||
p.logger.Error("upstream has no local_bind_port or local_bind_socket_path. "+
|
||||
if uc.LocalBindSocketPath != "" {
|
||||
p.logger.Error("local_bind_socket_path is not supported with this proxy implementation. "+
|
||||
"Can't start upstream.", "upstream", uc.String())
|
||||
continue
|
||||
}
|
||||
|
||||
if uc.LocalBindPort < 1 {
|
||||
p.logger.Error("upstream has no local_bind_port. "+
|
||||
"Can't start upstream.", "upstream", uc.String())
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -4,7 +4,11 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -23,9 +27,7 @@ func TestProxy_public(t *testing.T) {
|
|||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
ports := freeport.MustTake(1)
|
||||
ports := freeport.MustTake(2)
|
||||
defer freeport.Return(ports)
|
||||
|
||||
a := agent.NewTestAgent(t, "")
|
||||
|
@ -42,12 +44,31 @@ func TestProxy_public(t *testing.T) {
|
|||
Service: "echo",
|
||||
},
|
||||
}, nil)
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start the backend service that is being proxied
|
||||
testApp := NewTestTCPServer(t)
|
||||
defer testApp.Close()
|
||||
|
||||
upstreams := []UpstreamConfig{
|
||||
{
|
||||
DestinationName: "just-a-port",
|
||||
LocalBindPort: ports[1],
|
||||
},
|
||||
}
|
||||
|
||||
var unixSocket string
|
||||
if runtime.GOOS != "windows" {
|
||||
tempDir := testutil.TempDir(t, "consul")
|
||||
unixSocket = filepath.Join(tempDir, "test.sock")
|
||||
|
||||
upstreams = append(upstreams, UpstreamConfig{
|
||||
DestinationName: "just-a-unix-domain-socket",
|
||||
LocalBindSocketPath: unixSocket,
|
||||
LocalBindSocketMode: "0600",
|
||||
})
|
||||
}
|
||||
|
||||
// Start the proxy
|
||||
p, err := New(client, NewStaticConfigWatcher(&Config{
|
||||
ProxiedServiceName: "echo",
|
||||
|
@ -56,8 +77,9 @@ func TestProxy_public(t *testing.T) {
|
|||
BindPort: ports[0],
|
||||
LocalServiceAddress: testApp.Addr().String(),
|
||||
},
|
||||
Upstreams: upstreams,
|
||||
}), testutil.Logger(t))
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
defer p.Close()
|
||||
go p.Serve()
|
||||
|
||||
|
@ -65,7 +87,7 @@ func TestProxy_public(t *testing.T) {
|
|||
// if the proxy supports it. This is so we can verify below that the proxy _doesn't_
|
||||
// advertise `h2` support as it's only a L4 proxy.
|
||||
svc, err := connect.NewServiceWithConfig("echo", connect.Config{Client: client, ServerNextProtos: []string{"h2"}})
|
||||
require.NoError(err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a test connection to the proxy. We retry here a few times
|
||||
// since this is dependent on the agent actually starting up and setting
|
||||
|
@ -83,8 +105,25 @@ func TestProxy_public(t *testing.T) {
|
|||
|
||||
// Verify that we did not select h2 via ALPN since the proxy is layer 4 only
|
||||
tlsConn := conn.(*tls.Conn)
|
||||
require.Equal("", tlsConn.ConnectionState().NegotiatedProtocol)
|
||||
require.Equal(t, "", tlsConn.ConnectionState().NegotiatedProtocol)
|
||||
|
||||
// Connection works, test it is the right one
|
||||
TestEchoConn(t, conn, "")
|
||||
|
||||
t.Run("verify port upstream is configured", func(t *testing.T) {
|
||||
// Verify that it is listening by doing a simple TCP dial.
|
||||
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ports[1]))
|
||||
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
_ = conn.Close()
|
||||
})
|
||||
|
||||
t.Run("verify unix domain socket upstream will never work", func(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
// Ensure the socket was not created
|
||||
require.NoFileExists(t, unixSocket)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue