agent/local: support local alias checks

This commit is contained in:
Mitchell Hashimoto 2018-06-30 07:23:47 -07:00
parent 4a67beb734
commit 7543d270e2
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 106 additions and 4 deletions

View File

@ -170,8 +170,9 @@ type State struct {
// Services tracks the local services
services map[string]*ServiceState
// Checks tracks the local checks
checks map[types.CheckID]*CheckState
// Checks tracks the local checks. checkAliases are aliased checks.
checks map[types.CheckID]*CheckState
checkAliases map[string]map[types.CheckID]chan<- struct{}
// metadata tracks the node metadata fields
metadata map[string]string
@ -205,6 +206,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
checkAliases: make(map[string]map[types.CheckID]chan<- struct{}),
metadata: make(map[string]string),
tokens: tokens,
managedProxies: make(map[string]*ManagedProxy),
@ -406,6 +408,40 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
return nil
}
// AddAliasCheck creates an alias check. When any check for the srcServiceID
// is changed, checkID will reflect that using the same semantics as
// checks.CheckAlias.
//
// This is a local optimization so that the Alias check doesn't need to
// use blocking queries against the remote server for check updates for
// local services.
func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error {
l.Lock()
defer l.Unlock()
m, ok := l.checkAliases[srcServiceID]
if !ok {
m = make(map[types.CheckID]chan<- struct{})
l.checkAliases[srcServiceID] = m
}
m[checkID] = notifyCh
return nil
}
// RemoveAliasCheck removes the mapping for the alias check.
func (l *State) RemoveAliasCheck(checkID types.CheckID, srcServiceID string) {
l.Lock()
defer l.Unlock()
if m, ok := l.checkAliases[srcServiceID]; ok {
delete(m, checkID)
if len(m) == 0 {
delete(l.checkAliases, srcServiceID)
}
}
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
@ -486,6 +522,20 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
return
}
// If this is a check for an aliased service, then notify the waiters.
if aliases, ok := l.checkAliases[c.Check.ServiceID]; ok && len(aliases) > 0 {
for _, notifyCh := range aliases {
// Do not block. All notify channels should be buffered to at
// least 1 in which case not-blocking does not result in loss
// of data because a failed send means a notification is
// already queued.
select {
case notifyCh <- struct{}{}:
default:
}
}
}
// Update status and mark out of sync
c.Check.Status = status
c.Check.Output = output

View File

@ -11,8 +11,6 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/local"
@ -23,6 +21,7 @@ import (
"github.com/hashicorp/consul/types"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAgentAntiEntropy_Services(t *testing.T) {
@ -1606,6 +1605,59 @@ func TestAgent_AddCheckFailure(t *testing.T) {
}
}
func TestAgent_AliasCheck(t *testing.T) {
t.Parallel()
require := require.New(t)
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store))
l.TriggerSyncChanges = func() {}
// Add checks
require.NoError(l.AddService(&structs.NodeService{Service: "s1"}, ""))
require.NoError(l.AddService(&structs.NodeService{Service: "s2"}, ""))
require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c1"), ServiceID: "s1"}, ""))
require.NoError(l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("c2"), ServiceID: "s2"}, ""))
// Add an alias
notifyCh := make(chan struct{}, 1)
require.NoError(l.AddAliasCheck(types.CheckID("a1"), "s1", notifyCh))
// Update and verify we get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "")
select {
case <-notifyCh:
case <-time.After(100 * time.Millisecond):
t.Fatal("notify not received")
}
// Update again and verify we do not get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthCritical, "")
select {
case <-notifyCh:
t.Fatal("notify received")
case <-time.After(50 * time.Millisecond):
}
// Update other check and verify we do not get notified
l.UpdateCheck(types.CheckID("c2"), api.HealthCritical, "")
select {
case <-notifyCh:
t.Fatal("notify received")
case <-time.After(50 * time.Millisecond):
}
// Update change and verify we get notified
l.UpdateCheck(types.CheckID("c1"), api.HealthPassing, "")
select {
case <-notifyCh:
case <-time.After(100 * time.Millisecond):
t.Fatal("notify not received")
}
}
func TestAgent_sendCoordinate(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), `