mirror of https://github.com/status-im/consul.git
agent/checks: add Alias check type
This commit is contained in:
parent
3dd54c3e53
commit
632e4a2c69
|
@ -0,0 +1,135 @@
|
||||||
|
package checks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Constants related to alias check backoff.
|
||||||
|
const (
|
||||||
|
checkAliasBackoffMin = 3 // 3 attempts before backing off
|
||||||
|
checkAliasBackoffMaxWait = 1 * time.Minute // maximum backoff wait time
|
||||||
|
)
|
||||||
|
|
||||||
|
// CheckAlias is a check type that aliases the health of another service
|
||||||
|
// instance. If the service aliased has any critical health checks, then
|
||||||
|
// this check is critical. If the service has no critical but warnings,
|
||||||
|
// then this check is warning, and if a service has only passing checks, then
|
||||||
|
// this check is passing.
|
||||||
|
type CheckAlias struct {
|
||||||
|
Node string // Node name of the service. If empty, assumed to be this node.
|
||||||
|
ServiceID string // ID (not name) of the service to alias
|
||||||
|
|
||||||
|
CheckID types.CheckID // ID of this check
|
||||||
|
RPC RPC // Used to query remote server if necessary
|
||||||
|
Notify CheckNotifier // For updating the check state
|
||||||
|
|
||||||
|
stop bool
|
||||||
|
stopCh chan struct{}
|
||||||
|
stopLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start is used to start a check ttl, runs until Stop() func (c *CheckAlias) Start() {
|
||||||
|
func (c *CheckAlias) Start() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
c.stop = false
|
||||||
|
c.stopCh = make(chan struct{})
|
||||||
|
go c.run(c.stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is used to stop a check ttl.
|
||||||
|
func (c *CheckAlias) Stop() {
|
||||||
|
c.stopLock.Lock()
|
||||||
|
defer c.stopLock.Unlock()
|
||||||
|
if !c.stop {
|
||||||
|
c.stop = true
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is invoked in a goroutine until Stop() is called.
|
||||||
|
func (c *CheckAlias) run(stopCh chan struct{}) {
|
||||||
|
args := structs.NodeSpecificRequest{Node: c.Node}
|
||||||
|
args.AllowStale = true
|
||||||
|
args.MaxQueryTime = 1 * time.Minute
|
||||||
|
|
||||||
|
var attempt uint
|
||||||
|
for {
|
||||||
|
// Check if we're stopped. We fallthrough and block otherwise,
|
||||||
|
// which has a maximum time set above so we'll always check for
|
||||||
|
// stop within a reasonable amount of time.
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff if we have to
|
||||||
|
if attempt > checkAliasBackoffMin {
|
||||||
|
waitTime := (1 << (attempt - checkAliasBackoffMin)) * time.Second
|
||||||
|
if waitTime > checkAliasBackoffMaxWait {
|
||||||
|
waitTime = checkAliasBackoffMaxWait
|
||||||
|
}
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the current health checks for the specified node.
|
||||||
|
//
|
||||||
|
// NOTE(mitchellh): This currently returns ALL health checks for
|
||||||
|
// a node even though we also have the service ID. This can be
|
||||||
|
// optimized if we introduce a new RPC endpoint to filter both,
|
||||||
|
// but for blocking queries isn't that more efficient since the checks
|
||||||
|
// index is global to the cluster.
|
||||||
|
var out structs.IndexedHealthChecks
|
||||||
|
if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil {
|
||||||
|
attempt++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
attempt = 0 // Reset the attempts so we don't backoff the next
|
||||||
|
|
||||||
|
// Set our index for the next request
|
||||||
|
args.MinQueryIndex = out.Index
|
||||||
|
|
||||||
|
// We want to ensure that we're always blocking on subsequent requests
|
||||||
|
// to avoid hot loops. Index 1 is always safe since the min raft index
|
||||||
|
// is at least 5. Note this shouldn't happen but protecting against this
|
||||||
|
// case is safer than a 100% CPU loop.
|
||||||
|
if args.MinQueryIndex < 1 {
|
||||||
|
args.MinQueryIndex = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
health := api.HealthPassing
|
||||||
|
msg := "All checks passing."
|
||||||
|
if len(out.HealthChecks) == 0 {
|
||||||
|
// No health checks means we're healthy by default
|
||||||
|
msg = "No checks found."
|
||||||
|
}
|
||||||
|
for _, chk := range out.HealthChecks {
|
||||||
|
if chk.ServiceID != c.ServiceID || chk.Node != c.Node {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
|
||||||
|
health = chk.Status
|
||||||
|
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
|
||||||
|
|
||||||
|
// Critical checks exit the for loop immediately since we
|
||||||
|
// know that this is the health state. Warnings do not since
|
||||||
|
// there may still be a critical check.
|
||||||
|
if chk.Status == api.HealthCritical {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update our check value
|
||||||
|
c.Notify.UpdateCheck(c.CheckID, health, msg)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,267 @@
|
||||||
|
package checks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/mock"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
//"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test that we do a backoff on error.
|
||||||
|
func TestCheckAlias_remoteErrBackoff(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
notify := mock.NewNotify()
|
||||||
|
chkID := types.CheckID("foo")
|
||||||
|
rpc := &mockRPC{}
|
||||||
|
chk := &CheckAlias{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
CheckID: chkID,
|
||||||
|
Notify: notify,
|
||||||
|
RPC: rpc,
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.Reply.Store(fmt.Errorf("failure"))
|
||||||
|
|
||||||
|
chk.Start()
|
||||||
|
defer chk.Stop()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if got, want := atomic.LoadUint32(&rpc.Calls), uint32(6); got > want {
|
||||||
|
t.Fatalf("got %d updates want at most %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No remote health checks should result in passing on the check.
|
||||||
|
func TestCheckAlias_remoteNoChecks(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
notify := mock.NewNotify()
|
||||||
|
chkID := types.CheckID("foo")
|
||||||
|
rpc := &mockRPC{}
|
||||||
|
chk := &CheckAlias{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
CheckID: chkID,
|
||||||
|
Notify: notify,
|
||||||
|
RPC: rpc,
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.Reply.Store(structs.IndexedHealthChecks{})
|
||||||
|
|
||||||
|
chk.Start()
|
||||||
|
defer chk.Stop()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
if got, want := notify.State(chkID), api.HealthPassing; got != want {
|
||||||
|
r.Fatalf("got state %q want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only passing should result in passing
|
||||||
|
func TestCheckAlias_remotePassing(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
notify := mock.NewNotify()
|
||||||
|
chkID := types.CheckID("foo")
|
||||||
|
rpc := &mockRPC{}
|
||||||
|
chk := &CheckAlias{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
CheckID: chkID,
|
||||||
|
Notify: notify,
|
||||||
|
RPC: rpc,
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.Reply.Store(structs.IndexedHealthChecks{
|
||||||
|
HealthChecks: []*structs.HealthCheck{
|
||||||
|
// Should ignore non-matching node
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "A",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Should ignore non-matching service
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "db",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Match
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
chk.Start()
|
||||||
|
defer chk.Stop()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
if got, want := notify.State(chkID), api.HealthPassing; got != want {
|
||||||
|
r.Fatalf("got state %q want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any checks are critical, it should be critical
|
||||||
|
func TestCheckAlias_remoteCritical(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
notify := mock.NewNotify()
|
||||||
|
chkID := types.CheckID("foo")
|
||||||
|
rpc := &mockRPC{}
|
||||||
|
chk := &CheckAlias{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
CheckID: chkID,
|
||||||
|
Notify: notify,
|
||||||
|
RPC: rpc,
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.Reply.Store(structs.IndexedHealthChecks{
|
||||||
|
HealthChecks: []*structs.HealthCheck{
|
||||||
|
// Should ignore non-matching node
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "A",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Should ignore non-matching service
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "db",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Match
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
},
|
||||||
|
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
chk.Start()
|
||||||
|
defer chk.Stop()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
if got, want := notify.State(chkID), api.HealthCritical; got != want {
|
||||||
|
r.Fatalf("got state %q want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no checks are critical and at least one is warning, then it should warn
|
||||||
|
func TestCheckAlias_remoteWarning(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
notify := mock.NewNotify()
|
||||||
|
chkID := types.CheckID("foo")
|
||||||
|
rpc := &mockRPC{}
|
||||||
|
chk := &CheckAlias{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
CheckID: chkID,
|
||||||
|
Notify: notify,
|
||||||
|
RPC: rpc,
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.Reply.Store(structs.IndexedHealthChecks{
|
||||||
|
HealthChecks: []*structs.HealthCheck{
|
||||||
|
// Should ignore non-matching node
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "A",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Should ignore non-matching service
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "db",
|
||||||
|
Status: api.HealthCritical,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Match
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
},
|
||||||
|
|
||||||
|
&structs.HealthCheck{
|
||||||
|
Node: "remote",
|
||||||
|
ServiceID: "web",
|
||||||
|
Status: api.HealthWarning,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
chk.Start()
|
||||||
|
defer chk.Stop()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
if got, want := notify.State(chkID), api.HealthWarning; got != want {
|
||||||
|
r.Fatalf("got state %q want %q", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockRPC is an implementation of RPC that can be used for tests. The
|
||||||
|
// atomic.Value fields can be set concurrently and will reflect on the next
|
||||||
|
// RPC call.
|
||||||
|
type mockRPC struct {
|
||||||
|
Calls uint32 // Read-only, number of RPC calls
|
||||||
|
Args atomic.Value // Read-only, the last args sent
|
||||||
|
|
||||||
|
// Write-only, the reply to send. If of type "error" then an error will
|
||||||
|
// be returned from the RPC call.
|
||||||
|
Reply atomic.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error {
|
||||||
|
atomic.AddUint32(&m.Calls, 1)
|
||||||
|
m.Args.Store(args)
|
||||||
|
|
||||||
|
// We don't adhere to blocking queries, so this helps prevent
|
||||||
|
// too much CPU usage on the check loop.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// This whole machinery below sets the value of the reply. This is
|
||||||
|
// basically what net/rpc does internally, though much condensed.
|
||||||
|
replyv := reflect.ValueOf(reply)
|
||||||
|
if replyv.Kind() != reflect.Ptr {
|
||||||
|
return fmt.Errorf("RPC reply must be pointer")
|
||||||
|
}
|
||||||
|
replyv = replyv.Elem() // Get pointer value
|
||||||
|
replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value
|
||||||
|
if v := m.Reply.Load(); v != nil {
|
||||||
|
// Return an error if the reply is an error type
|
||||||
|
if err, ok := v.(error); ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -38,6 +38,13 @@ const (
|
||||||
UserAgent = "Consul Health Check"
|
UserAgent = "Consul Health Check"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// RPC is an interface that an RPC client must implement. This is a helper
|
||||||
|
// interface that is implemented by the agent delegate for checks that need
|
||||||
|
// to make RPC calls.
|
||||||
|
type RPC interface {
|
||||||
|
RPC(method string, args interface{}, reply interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
// CheckNotifier interface is used by the CheckMonitor
|
// CheckNotifier interface is used by the CheckMonitor
|
||||||
// to notify when a check has a status update. The update
|
// to notify when a check has a status update. The update
|
||||||
// should take care to be idempotent.
|
// should take care to be idempotent.
|
||||||
|
|
Loading…
Reference in New Issue