connect/proxy: fix a number of problems with Listener

We noticed that TestUpstreamListener would deadlock sometimes when run
with the race detector. While debugging this issue I found and fixed the
following problems.

1. the net.Listener was not being closed properly when Listener.Stop was
   called. This caused the Listener.Serve goroutine to run forever.
   Fixed by storing a reference to net.Listener and closing it properly
   when Listener.Stop is called.
2. call connWG.Add in the correct place. WaitGroup.Add must be called
   before starting a goroutine, not from inside the goroutine.
3. Set metrics config EnableRuntimeMetrics to `false` so that we don't
   start a background goroutine in each test for no reason. There is no
   way to shutdown this goroutine, and it was an added distraction while
   debugging these timeouts.
5. two tests were calling require.NoError from a goroutine.
   require.NoError calls t.FailNow, which MUST be called from the main
   test goroutine. Instead use t.Errorf, which can be called from other
   goroutines and will still fail the test.
6. `assertCurrentGaugeValue` wass breaking out of a for loop, which
   would cause the `RWMutex.RUnlock` to be missed. Fixed by calling
   unlock before `break`.

The core issue of a deadlock  was fixed by https://github.com/armon/go-metrics/pull/124.
This commit is contained in:
Daniel Nephin 2021-04-28 14:27:55 -04:00
parent 146e1d3878
commit d18a03b07f
2 changed files with 21 additions and 12 deletions

View File

@ -10,10 +10,11 @@ import (
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/go-hclog"
) )
const ( const (
@ -44,6 +45,7 @@ type Listener struct {
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and // `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct. // this is cheap and correct.
listeningChan chan struct{} listeningChan chan struct{}
listener net.Listener
logger hclog.Logger logger hclog.Logger
@ -136,14 +138,15 @@ func (l *Listener) Serve() error {
return errors.New("serve called on a closed listener") return errors.New("serve called on a closed listener")
} }
listen, err := l.listenFunc() var err error
l.listener, err = l.listenFunc()
if err != nil { if err != nil {
return err return err
} }
close(l.listeningChan) close(l.listeningChan)
for { for {
conn, err := listen.Accept() conn, err := l.listener.Accept()
if err != nil { if err != nil {
if atomic.LoadInt32(&l.stopFlag) == 1 { if atomic.LoadInt32(&l.stopFlag) == 1 {
return nil return nil
@ -151,6 +154,7 @@ func (l *Listener) Serve() error {
return err return err
} }
l.connWG.Add(1)
go l.handleConn(conn) go l.handleConn(conn)
} }
} }
@ -158,6 +162,8 @@ func (l *Listener) Serve() error {
// handleConn is the internal connection handler goroutine. // handleConn is the internal connection handler goroutine.
func (l *Listener) handleConn(src net.Conn) { func (l *Listener) handleConn(src net.Conn) {
defer src.Close() defer src.Close()
// Make sure Listener.Close waits for this conn to be cleaned up.
defer l.connWG.Done()
dst, err := l.dialFunc() dst, err := l.dialFunc()
if err != nil { if err != nil {
@ -169,11 +175,6 @@ func (l *Listener) handleConn(src net.Conn) {
// it closes. // it closes.
defer l.trackConn()() defer l.trackConn()()
// Make sure Close() waits for this conn to be cleaned up. Note defer is
// before conn.Close() so runs after defer conn.Close().
l.connWG.Add(1)
defer l.connWG.Done()
// Note no need to defer dst.Close() since conn handles that for us. // Note no need to defer dst.Close() since conn handles that for us.
conn := NewConn(src, dst) conn := NewConn(src, dst)
defer conn.Close() defer conn.Close()
@ -246,6 +247,10 @@ func (l *Listener) Close() error {
close(l.stopChan) close(l.stopChan)
// Wait for all conns to close // Wait for all conns to close
l.connWG.Wait() l.connWG.Wait()
if l.listener != nil {
l.listener.Close()
}
} }
return nil return nil
} }

View File

@ -26,6 +26,7 @@ func testSetupMetrics(t *testing.T) *metrics.InmemSink {
s := metrics.NewInmemSink(10*time.Second, 300*time.Second) s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("consul.proxy.test") cfg := metrics.DefaultConfig("consul.proxy.test")
cfg.EnableHostname = false cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, s) metrics.NewGlobal(cfg, s)
return s return s
} }
@ -45,6 +46,7 @@ func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink,
currentInterval.RLock() currentInterval.RLock()
if len(currentInterval.Gauges) > 0 { if len(currentInterval.Gauges) > 0 {
got = currentInterval.Gauges[name].Value got = currentInterval.Gauges[name].Value
currentInterval.RUnlock()
break break
} }
currentInterval.RUnlock() currentInterval.RUnlock()
@ -132,8 +134,9 @@ func TestPublicListener(t *testing.T) {
// Run proxy // Run proxy
go func() { go func() {
err := l.Serve() if err := l.Serve(); err != nil {
require.NoError(t, err) t.Errorf("failed to listen: %v", err.Error())
}
}() }()
defer l.Close() defer l.Close()
l.Wait() l.Wait()
@ -200,8 +203,9 @@ func TestUpstreamListener(t *testing.T) {
// Run proxy // Run proxy
go func() { go func() {
err := l.Serve() if err := l.Serve(); err != nil {
require.NoError(t, err) t.Errorf("failed to listen: %v", err.Error())
}
}() }()
defer l.Close() defer l.Close()
l.Wait() l.Wait()