mirror of https://github.com/status-im/consul.git
agent: fix several data races and bugs related to node-local alias checks (#5876)
The observed bug was that a full restart of a consul datacenter (servers and clients) in conjunction with a restart of a connect-flavored application with bring-your-own-service-registration logic would very frequently cause the envoy sidecar service check to never reflect the aliased service. Over the course of investigation several bugs and unfortunate interactions were corrected: (1) local.CheckState objects were only shallow copied, but the key piece of data that gets read and updated is one of the things not copied (the underlying Check with a Status field). When the stock code was run with the race detector enabled this highly-relevant-to-the-test-scenario field was found to be racy. Changes: a) update the existing Clone method to include the Check field b) copy-on-write when those fields need to change rather than incrementally updating them in place. This made the observed behavior occur slightly less often. (2) If anything about how the runLocal method for node-local alias check logic was ever flawed, there was no fallback option. Those checks are purely edge-triggered and failure to properly notice a single edge transition would leave the alias check incorrect until the next flap of the aliased check. The change was to introduce a fallback timer to act as a control loop to double check the alias check matches the aliased check every minute (borrowing the duration from the non-local alias check logic body). This made the observed behavior eventually go away when it did occur. (3) Originally I thought there were two main actions involved in the data race: A. The act of adding the original check (from disk recovery) and its first health evaluation. B. The act of the HTTP API requests coming in and resetting the local state when re-registering the same services and checks. It took awhile for me to realize that there's a third action at work: C. The goroutines associated with the original check and the later checks. The actual sequence of actions that was causing the bad behavior was that the API actions result in the original check to be removed and re-added _without waiting for the original goroutine to terminate_. This means for brief windows of time during check definition edits there are two goroutines that can be sending updates for the alias check status. In extremely unlikely scenarios the original goroutine sees the aliased check start up in `critical` before being removed but does not get the notification about the nearly immediate update of that check to `passing`. This is interlaced wit the new goroutine coming up, initializing its base case to `passing` from the current state and then listening for new notifications of edge triggers. If the original goroutine "finishes" its update, it then commits one more write into the local state of `critical` and exits leaving the alias check no longer reflecting the underlying check. The correction here is to enforce that the old goroutines must terminate before spawning the new one for alias checks.
This commit is contained in:
parent
6b31482333
commit
40336fd353
|
@ -2,6 +2,7 @@ package agent
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -23,6 +25,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -1125,6 +1128,184 @@ func TestAgent_AddCheck_GRPC(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
|
||||
// t.Parallel() don't even think about making this parallel
|
||||
|
||||
// This test is very contrived and tests for the absence of race conditions
|
||||
// related to the implementation of alias checks. As such it is slow,
|
||||
// serial, full of sleeps and retries, and not generally a great test to
|
||||
// run all of the time.
|
||||
//
|
||||
// That said it made it incredibly easy to root out various race conditions
|
||||
// quite successfully.
|
||||
//
|
||||
// The original set of races was between:
|
||||
//
|
||||
// - agent startup reloading Services and Checks from disk
|
||||
// - API requests to also re-register those same Services and Checks
|
||||
// - the goroutines for the as-yet-to-be-stopped CheckAlias goroutines
|
||||
|
||||
if os.Getenv("SLOWTEST") != "1" {
|
||||
t.Skip("skipping slow test; set SLOWTEST=1 to run")
|
||||
return
|
||||
}
|
||||
|
||||
// We do this so that the agent logs and the informational messages from
|
||||
// the test itself are interwoven properly.
|
||||
logf := func(t *testing.T, a *TestAgent, format string, args ...interface{}) {
|
||||
a.logger.Printf("[INFO] testharness: "+format, args...)
|
||||
}
|
||||
|
||||
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
|
||||
cfg := `
|
||||
server = false
|
||||
bootstrap = false
|
||||
enable_central_service_config = false
|
||||
data_dir = "` + dataDir + `"
|
||||
`
|
||||
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
|
||||
a.LogOutput = testutil.TestWriter(t)
|
||||
a.Start(t)
|
||||
defer os.RemoveAll(dataDir)
|
||||
defer a.Shutdown()
|
||||
|
||||
testCtx, testCancel := context.WithCancel(context.Background())
|
||||
defer testCancel()
|
||||
|
||||
testHTTPServer := launchHTTPCheckServer(t, testCtx)
|
||||
defer testHTTPServer.Close()
|
||||
|
||||
registerServicesAndChecks := func(t *testing.T, a *TestAgent) {
|
||||
// add one persistent service with a simple check
|
||||
require.NoError(t, a.AddService(
|
||||
&structs.NodeService{
|
||||
ID: "ping",
|
||||
Service: "ping",
|
||||
Port: 8000,
|
||||
},
|
||||
[]*structs.CheckType{
|
||||
&structs.CheckType{
|
||||
HTTP: testHTTPServer.URL,
|
||||
Method: "GET",
|
||||
Interval: 5 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
},
|
||||
},
|
||||
true, "", ConfigSourceLocal,
|
||||
))
|
||||
|
||||
// add one persistent sidecar service with an alias check in the manner
|
||||
// of how sidecar_service would add it
|
||||
require.NoError(t, a.AddService(
|
||||
&structs.NodeService{
|
||||
ID: "ping-sidecar-proxy",
|
||||
Service: "ping-sidecar-proxy",
|
||||
Port: 9000,
|
||||
},
|
||||
[]*structs.CheckType{
|
||||
&structs.CheckType{
|
||||
Name: "Connect Sidecar Aliasing ping",
|
||||
AliasService: "ping",
|
||||
},
|
||||
},
|
||||
true, "", ConfigSourceLocal,
|
||||
))
|
||||
}
|
||||
|
||||
retryUntilCheckState := func(t *testing.T, a *TestAgent, checkID string, expectedStatus string) {
|
||||
t.Helper()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
chk := a.State.CheckState(types.CheckID(checkID))
|
||||
if chk == nil {
|
||||
r.Fatalf("check=%q is completely missing", checkID)
|
||||
}
|
||||
if chk.Check.Status != expectedStatus {
|
||||
logf(t, a, "check=%q expected status %q but got %q", checkID, expectedStatus, chk.Check.Status)
|
||||
r.Fatalf("check=%q expected status %q but got %q", checkID, expectedStatus, chk.Check.Status)
|
||||
}
|
||||
logf(t, a, "check %q has reached desired status %q", checkID, expectedStatus)
|
||||
})
|
||||
}
|
||||
|
||||
registerServicesAndChecks(t, a)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
retryUntilCheckState(t, a, "service:ping", api.HealthPassing)
|
||||
retryUntilCheckState(t, a, "service:ping-sidecar-proxy", api.HealthPassing)
|
||||
|
||||
logf(t, a, "==== POWERING DOWN ORIGINAL ====")
|
||||
|
||||
require.NoError(t, a.Shutdown())
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
futureHCL := cfg + `
|
||||
node_id = "` + string(a.Config.NodeID) + `"
|
||||
node_name = "` + a.Config.NodeName + `"
|
||||
`
|
||||
|
||||
restartOnce := func(idx int, t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
// Reload and retain former NodeID and data directory.
|
||||
a2 := &TestAgent{Name: t.Name(), HCL: futureHCL, DataDir: dataDir}
|
||||
a2.LogOutput = testutil.TestWriter(t)
|
||||
a2.Start(t)
|
||||
defer a2.Shutdown()
|
||||
a = nil
|
||||
|
||||
// reregister during standup; we use an adjustable timing to try and force a race
|
||||
sleepDur := time.Duration(idx+1) * 500 * time.Millisecond
|
||||
time.Sleep(sleepDur)
|
||||
logf(t, a2, "re-registering checks and services after a delay of %v", sleepDur)
|
||||
for i := 0; i < 20; i++ { // RACE RACE RACE!
|
||||
registerServicesAndChecks(t, a2)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
retryUntilCheckState(t, a2, "service:ping", api.HealthPassing)
|
||||
|
||||
logf(t, a2, "giving the alias check a chance to notice...")
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
retryUntilCheckState(t, a2, "service:ping-sidecar-proxy", api.HealthPassing)
|
||||
}
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
name := "restart-" + strconv.Itoa(i)
|
||||
ok := t.Run(name, func(t *testing.T) {
|
||||
restartOnce(i, t)
|
||||
})
|
||||
require.True(t, ok, name+" failed")
|
||||
}
|
||||
}
|
||||
|
||||
func launchHTTPCheckServer(t *testing.T, ctx context.Context) *httptest.Server {
|
||||
ports := freeport.GetT(t, 1)
|
||||
port := ports[0]
|
||||
|
||||
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
|
||||
|
||||
var lc net.ListenConfig
|
||||
listener, err := lc.Listen(ctx, "tcp", addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("OK\n"))
|
||||
})
|
||||
|
||||
srv := &httptest.Server{
|
||||
Listener: listener,
|
||||
Config: &http.Server{Handler: handler},
|
||||
}
|
||||
srv.Start()
|
||||
return srv
|
||||
}
|
||||
|
||||
func TestAgent_AddCheck_Alias(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -33,6 +33,8 @@ type CheckAlias struct {
|
|||
stop bool
|
||||
stopCh chan struct{}
|
||||
stopLock sync.Mutex
|
||||
|
||||
stopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// AliasNotifier is a CheckNotifier specifically for the Alias check.
|
||||
|
@ -58,15 +60,24 @@ func (c *CheckAlias) Start() {
|
|||
// Stop is used to stop the check.
|
||||
func (c *CheckAlias) Stop() {
|
||||
c.stopLock.Lock()
|
||||
defer c.stopLock.Unlock()
|
||||
if !c.stop {
|
||||
c.stop = true
|
||||
close(c.stopCh)
|
||||
}
|
||||
c.stopLock.Unlock()
|
||||
|
||||
// Wait until the associated goroutine is definitely complete before
|
||||
// returning to the caller. This is to prevent the new and old checks from
|
||||
// both updating the state of the alias check using possibly stale
|
||||
// information.
|
||||
c.stopWg.Wait()
|
||||
}
|
||||
|
||||
// run is invoked in a goroutine until Stop() is called.
|
||||
func (c *CheckAlias) run(stopCh chan struct{}) {
|
||||
c.stopWg.Add(1)
|
||||
defer c.stopWg.Done()
|
||||
|
||||
// If we have a specific node set, then use a blocking query
|
||||
if c.Node != "" {
|
||||
c.runQuery(stopCh)
|
||||
|
@ -85,13 +96,26 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
|
|||
c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh)
|
||||
defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID)
|
||||
|
||||
// maxDurationBetweenUpdates is maximum time we go between explicit
|
||||
// notifications before we re-query the aliased service checks anyway. This
|
||||
// helps in the case we miss an edge triggered event and the alias does not
|
||||
// accurately reflect the underlying service health status.
|
||||
const maxDurationBetweenUpdates = 1 * time.Minute
|
||||
|
||||
var refreshTimer <-chan time.Time
|
||||
extendRefreshTimer := func() {
|
||||
refreshTimer = time.After(maxDurationBetweenUpdates)
|
||||
}
|
||||
|
||||
updateStatus := func() {
|
||||
checks := c.Notify.Checks()
|
||||
checksList := make([]*structs.HealthCheck, 0, len(checks))
|
||||
for _, chk := range checks {
|
||||
checksList = append(checksList, chk)
|
||||
}
|
||||
|
||||
c.processChecks(checksList)
|
||||
extendRefreshTimer()
|
||||
}
|
||||
|
||||
// Immediately run to get the current state of the target service
|
||||
|
@ -99,6 +123,8 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-refreshTimer:
|
||||
updateStatus()
|
||||
case <-notifyCh:
|
||||
updateStatus()
|
||||
case <-stopCh:
|
||||
|
@ -203,6 +229,8 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
|
|||
msg = "All checks passing."
|
||||
}
|
||||
|
||||
// TODO(rb): if no matching checks found should this default to critical?
|
||||
|
||||
// Update our check value
|
||||
c.Notify.UpdateCheck(c.CheckID, health, msg)
|
||||
}
|
||||
|
|
|
@ -70,6 +70,9 @@ func (s *ServiceState) Clone() *ServiceState {
|
|||
// CheckState describes the state of a health check record.
|
||||
type CheckState struct {
|
||||
// Check is the local copy of the health check record.
|
||||
//
|
||||
// Must Clone() the overall CheckState before mutating this. After mutation
|
||||
// reinstall into the checks map.
|
||||
Check *structs.HealthCheck
|
||||
|
||||
// Token is the ACL record to update or delete the health check
|
||||
|
@ -95,12 +98,15 @@ type CheckState struct {
|
|||
Deleted bool
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of the object. The check record and the
|
||||
// defer timer still point to the original values and must not be
|
||||
// modified.
|
||||
// Clone returns a shallow copy of the object.
|
||||
//
|
||||
// The defer timer still points to the original value and must not be modified.
|
||||
func (c *CheckState) Clone() *CheckState {
|
||||
c2 := new(CheckState)
|
||||
*c2 = *c
|
||||
if c.Check != nil {
|
||||
c2.Check = c.Check.Clone()
|
||||
}
|
||||
return c2
|
||||
}
|
||||
|
||||
|
@ -590,6 +596,18 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
return
|
||||
}
|
||||
|
||||
// Ensure we only mutate a copy of the check state and put the finalized
|
||||
// version into the checks map when complete.
|
||||
//
|
||||
// Note that we are relying upon the earlier deferred mutex unlock to
|
||||
// happen AFTER this defer. As per the Go spec this is true, but leaving
|
||||
// this note here for the future in case of any refactorings which may not
|
||||
// notice this relationship.
|
||||
c = c.Clone()
|
||||
defer func(c *CheckState) {
|
||||
l.checks[id] = c
|
||||
}(c)
|
||||
|
||||
// Defer a sync if the output has changed. This is an optimization around
|
||||
// frequent updates of output. Instead, we update the output internally,
|
||||
// and periodically do a write-back to the servers. If there is a status
|
||||
|
@ -651,9 +669,9 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
|
|||
return m
|
||||
}
|
||||
|
||||
// CheckState returns a shallow copy of the current health check state
|
||||
// record. The health check record and the deferred check still point to
|
||||
// the original values and must not be modified.
|
||||
// CheckState returns a shallow copy of the current health check state record.
|
||||
//
|
||||
// The defer timer still points to the original value and must not be modified.
|
||||
func (l *State) CheckState(id types.CheckID) *CheckState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
@ -685,8 +703,9 @@ func (l *State) setCheckStateLocked(c *CheckState) {
|
|||
}
|
||||
|
||||
// CheckStates returns a shallow copy of all health check state records.
|
||||
// The health check records and the deferred checks still point to
|
||||
// the original values and must not be modified.
|
||||
// The map contains a shallow copy of the current check states.
|
||||
//
|
||||
// The defer timers still point to the original values and must not be modified.
|
||||
func (l *State) CheckStates() map[types.CheckID]*CheckState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
@ -703,9 +722,9 @@ func (l *State) CheckStates() map[types.CheckID]*CheckState {
|
|||
|
||||
// CriticalCheckStates returns the locally registered checks that the
|
||||
// agent is aware of and are being kept in sync with the server.
|
||||
// The map contains a shallow copy of the current check states but
|
||||
// references to the actual check definition which must not be
|
||||
// modified.
|
||||
// The map contains a shallow copy of the current check states.
|
||||
//
|
||||
// The defer timers still point to the original values and must not be modified.
|
||||
func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
|
|
@ -1041,7 +1041,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Clone returns a distinct clone of the HealthCheck.
|
||||
// Clone returns a distinct clone of the HealthCheck. Note that the
|
||||
// "ServiceTags" and "Definition.Header" field are not deep copied.
|
||||
func (c *HealthCheck) Clone() *HealthCheck {
|
||||
clone := new(HealthCheck)
|
||||
*clone = *c
|
||||
|
|
|
@ -126,14 +126,15 @@ func (a *TestAgent) Start(t *testing.T) *TestAgent {
|
|||
require.NoError(err, fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err))
|
||||
hclDataDir = `data_dir = "` + d + `"`
|
||||
}
|
||||
id := NodeID()
|
||||
|
||||
var id string
|
||||
for i := 10; i >= 0; i-- {
|
||||
a.Config = TestConfig(
|
||||
randomPortsSource(a.UseTLS),
|
||||
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
|
||||
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
|
||||
)
|
||||
id = string(a.Config.NodeID)
|
||||
|
||||
// write the keyring
|
||||
if a.Key != "" {
|
||||
|
|
Loading…
Reference in New Issue