diff --git a/agent/local/state.go b/agent/local/state.go index 2cee7d05aa..f18bad5be3 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -548,6 +548,9 @@ func (l *State) removeCheckLocked(id types.CheckID) error { return fmt.Errorf("Check %q does not exist", id) } + // If this is a check for an aliased service, then notify the waiters. + l.notifyIfAliased(c.Check.ServiceID) + // To remove the check on the server we need the token. // Therefore, we mark the service as deleted and keep the // entry around until it is actually removed. @@ -616,18 +619,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { } // If this is a check for an aliased service, then notify the waiters. - if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 { - for _, notifyCh := range aliases { - // Do not block. All notify channels should be buffered to at - // least 1 in which case not-blocking does not result in loss - // of data because a failed send means a notification is - // already queued. This must be called with the lock held. - select { - case notifyCh <- struct{}{}: - default: - } - } - } + l.notifyIfAliased(c.Check.ServiceID) // Update status and mark out of sync c.Check.Status = status @@ -685,6 +677,10 @@ func (l *State) SetCheckState(c *CheckState) { func (l *State) setCheckStateLocked(c *CheckState) { l.checks[c.Check.CheckID] = c + + // If this is a check for an aliased service, then notify the waiters. + l.notifyIfAliased(c.Check.ServiceID) + l.TriggerSyncChanges() } @@ -1474,3 +1470,19 @@ func (l *State) syncNodeInfo() error { return err } } + +// notifyIfAliased will notify waiters if this is a check for an aliased service +func (l *State) notifyIfAliased(serviceID string) { + if aliases, ok := l.checkAliases[serviceID]; ok && len(aliases) > 0 { + for _, notifyCh := range aliases { + // Do not block. All notify channels should be buffered to at + // least 1 in which case not-blocking does not result in loss + // of data because a failed send means a notification is + // already queued. This must be called with the lock held. + select { + case notifyCh <- struct{}{}: + default: + } + } + } +} diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 046ac47f75..aa66541a5f 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -2211,6 +2211,90 @@ func TestStateProxyRestore(t *testing.T) { assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port) } +// Test that alias check is updated after AddCheck, UpdateCheck, and RemoveCheck for the same service id +func TestAliasNotifications_local(t *testing.T) { + t.Parallel() + + a := agent.NewTestAgent(t, t.Name(), "") + defer a.Shutdown() + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + // Register service with a failing TCP check + svcID := "socat" + srv := &structs.NodeService{ + ID: svcID, + Service: "echo", + Tags: []string{}, + Address: "127.0.0.10", + Port: 8080, + } + a.State.AddService(srv, "") + + scID := "socat-sidecar-proxy" + sc := &structs.NodeService{ + ID: scID, + Service: scID, + Tags: []string{}, + Address: "127.0.0.10", + Port: 9090, + } + a.State.AddService(sc, "") + + tcpID := types.CheckID("service:socat-tcp") + chk0 := &structs.HealthCheck{ + Node: "", + CheckID: tcpID, + Name: "tcp check", + Status: api.HealthPassing, + ServiceID: svcID, + } + a.State.AddCheck(chk0, "") + + // Register an alias for the service + proxyID := types.CheckID("service:socat-sidecar-proxy:2") + chk1 := &structs.HealthCheck{ + Node: "", + CheckID: proxyID, + Name: "Connect Sidecar Aliasing socat", + Status: api.HealthPassing, + ServiceID: scID, + } + chkt := &structs.CheckType{ + AliasService: svcID, + } + require.NoError(t, a.AddCheck(chk1, chkt, true, "", agent.ConfigSourceLocal)) + + // Add a failing check to the same service ID, alias should also fail + maintID := types.CheckID("service:socat-maintenance") + chk2 := &structs.HealthCheck{ + Node: "", + CheckID: maintID, + Name: "socat:Service Maintenance Mode", + Status: api.HealthCritical, + ServiceID: svcID, + } + a.State.AddCheck(chk2, "") + + retry.Run(t, func(r *retry.R) { + require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status) + }) + + // Remove the failing check, alias should pass + a.State.RemoveCheck(maintID) + + retry.Run(t, func(r *retry.R) { + require.Equal(r, api.HealthPassing, a.State.Check(proxyID).Status) + }) + + // Update TCP check to failing, alias should fail + a.State.UpdateCheck(tcpID, api.HealthCritical, "") + + retry.Run(t, func(r *retry.R) { + require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status) + }) +} + // drainCh drains a channel by reading messages until it would block. func drainCh(ch chan struct{}) { for {