Update alias checks on local add and remove

This commit is contained in:
Freddy 2019-04-24 12:17:06 -06:00 committed by GitHub
parent 1a7406885b
commit f2213f60e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 12 deletions

View File

@ -548,6 +548,9 @@ func (l *State) removeCheckLocked(id types.CheckID) error {
return fmt.Errorf("Check %q does not exist", id)
}
// If this is a check for an aliased service, then notify the waiters.
l.notifyIfAliased(c.Check.ServiceID)
// To remove the check on the server we need the token.
// Therefore, we mark the service as deleted and keep the
// entry around until it is actually removed.
@ -616,18 +619,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
}
// If this is a check for an aliased service, then notify the waiters.
if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 {
for _, notifyCh := range aliases {
// Do not block. All notify channels should be buffered to at
// least 1 in which case not-blocking does not result in loss
// of data because a failed send means a notification is
// already queued. This must be called with the lock held.
select {
case notifyCh <- struct{}{}:
default:
}
}
}
l.notifyIfAliased(c.Check.ServiceID)
// Update status and mark out of sync
c.Check.Status = status
@ -685,6 +677,10 @@ func (l *State) SetCheckState(c *CheckState) {
func (l *State) setCheckStateLocked(c *CheckState) {
l.checks[c.Check.CheckID] = c
// If this is a check for an aliased service, then notify the waiters.
l.notifyIfAliased(c.Check.ServiceID)
l.TriggerSyncChanges()
}
@ -1474,3 +1470,19 @@ func (l *State) syncNodeInfo() error {
return err
}
}
// notifyIfAliased will notify waiters if this is a check for an aliased service
func (l *State) notifyIfAliased(serviceID string) {
if aliases, ok := l.checkAliases[serviceID]; ok && len(aliases) > 0 {
for _, notifyCh := range aliases {
// Do not block. All notify channels should be buffered to at
// least 1 in which case not-blocking does not result in loss
// of data because a failed send means a notification is
// already queued. This must be called with the lock held.
select {
case notifyCh <- struct{}{}:
default:
}
}
}
}

View File

@ -2211,6 +2211,90 @@ func TestStateProxyRestore(t *testing.T) {
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
}
// Test that alias check is updated after AddCheck, UpdateCheck, and RemoveCheck for the same service id
func TestAliasNotifications_local(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register service with a failing TCP check
svcID := "socat"
srv := &structs.NodeService{
ID: svcID,
Service: "echo",
Tags: []string{},
Address: "127.0.0.10",
Port: 8080,
}
a.State.AddService(srv, "")
scID := "socat-sidecar-proxy"
sc := &structs.NodeService{
ID: scID,
Service: scID,
Tags: []string{},
Address: "127.0.0.10",
Port: 9090,
}
a.State.AddService(sc, "")
tcpID := types.CheckID("service:socat-tcp")
chk0 := &structs.HealthCheck{
Node: "",
CheckID: tcpID,
Name: "tcp check",
Status: api.HealthPassing,
ServiceID: svcID,
}
a.State.AddCheck(chk0, "")
// Register an alias for the service
proxyID := types.CheckID("service:socat-sidecar-proxy:2")
chk1 := &structs.HealthCheck{
Node: "",
CheckID: proxyID,
Name: "Connect Sidecar Aliasing socat",
Status: api.HealthPassing,
ServiceID: scID,
}
chkt := &structs.CheckType{
AliasService: svcID,
}
require.NoError(t, a.AddCheck(chk1, chkt, true, "", agent.ConfigSourceLocal))
// Add a failing check to the same service ID, alias should also fail
maintID := types.CheckID("service:socat-maintenance")
chk2 := &structs.HealthCheck{
Node: "",
CheckID: maintID,
Name: "socat:Service Maintenance Mode",
Status: api.HealthCritical,
ServiceID: svcID,
}
a.State.AddCheck(chk2, "")
retry.Run(t, func(r *retry.R) {
require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status)
})
// Remove the failing check, alias should pass
a.State.RemoveCheck(maintID)
retry.Run(t, func(r *retry.R) {
require.Equal(r, api.HealthPassing, a.State.Check(proxyID).Status)
})
// Update TCP check to failing, alias should fail
a.State.UpdateCheck(tcpID, api.HealthCritical, "")
retry.Run(t, func(r *retry.R) {
require.Equal(r, api.HealthCritical, a.State.Check(proxyID).Status)
})
}
// drainCh drains a channel by reading messages until it would block.
func drainCh(ch chan struct{}) {
for {