Adds fine-grained watches to health endpoints.

This commit is contained in:
James Phillips 2017-01-23 23:37:21 -08:00
parent e4b88324b3
commit dcb55c766b
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
9 changed files with 542 additions and 214 deletions

View File

@ -146,7 +146,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify check
_, checks, err := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -268,7 +268,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
// Verify check not registered
_, checks, err := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -344,7 +344,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
// Verify checks not registered
_, checks, err := fsm.state.NodeChecks("foo")
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -482,7 +482,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("Bad: %v", fooSrv)
}
_, checks, err := fsm2.state.NodeChecks("foo")
_, checks, err := fsm2.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// Health endpoint is used to query the health information
@ -20,18 +21,17 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
// Get the state specific checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ChecksInState"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters)
index, checks, err = state.ChecksInStateByNodeMeta(ws, args.State, args.NodeMetaFilters)
} else {
index, checks, err = state.ChecksInState(args.State)
index, checks, err = state.ChecksInState(ws, args.State)
}
if err != nil {
return err
@ -53,12 +53,11 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
// Get the node checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("NodeChecks"),
func() error {
index, checks, err := state.NodeChecks(args.Node)
func(ws memdb.WatchSet) error {
index, checks, err := state.NodeChecks(ws, args.Node)
if err != nil {
return err
}
@ -82,18 +81,17 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
// Get the service checks
state := h.srv.fsm.State()
return h.srv.blockingRPC(
return h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ServiceChecks"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters)
index, checks, err = state.ServiceChecksByNodeMeta(ws, args.ServiceName, args.NodeMetaFilters)
} else {
index, checks, err = state.ServiceChecks(args.ServiceName)
index, checks, err = state.ServiceChecks(ws, args.ServiceName)
}
if err != nil {
return err
@ -119,18 +117,17 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
// Get the nodes
state := h.srv.fsm.State()
err := h.srv.blockingRPC(
err := h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("CheckServiceNodes"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var nodes structs.CheckServiceNodes
var err error
if args.TagFilter {
index, nodes, err = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
index, nodes, err = state.CheckServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
} else {
index, nodes, err = state.CheckServiceNodes(args.ServiceName)
index, nodes, err = state.CheckServiceNodes(ws, args.ServiceName)
}
if err != nil {
return err

View File

@ -45,7 +45,7 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index
idx, out1, err := state.CheckServiceNodes("db")
idx, out1, err := state.CheckServiceNodes(nil, "db")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -68,7 +68,7 @@ func TestHealthCheckRace(t *testing.T) {
}
// Verify the index changed
idx, out2, err := state.CheckServiceNodes("db")
idx, out2, err := state.CheckServiceNodes(nil, "db")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -262,7 +262,7 @@ func (s *Server) reconcile() (err error) {
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State()
_, checks, err := state.ChecksInState(structs.HealthAny)
_, checks, err := state.ChecksInState(nil, structs.HealthAny)
if err != nil {
return err
}
@ -402,7 +402,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the serfCheck is in the passing state
_, checks, err := state.NodeChecks(member.Name)
_, checks, err := state.NodeChecks(nil, member.Name)
if err != nil {
return err
}
@ -446,7 +446,7 @@ func (s *Server) handleFailedMember(member serf.Member) error {
}
if node != nil && node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state
_, checks, err := state.NodeChecks(member.Name)
_, checks, err := state.NodeChecks(nil, member.Name)
if err != nil {
return err
}

View File

@ -44,7 +44,7 @@ func TestLeader_RegisterMember(t *testing.T) {
})
// Should have a check
_, checks, err := state.NodeChecks(c1.config.NodeName)
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -114,7 +114,7 @@ func TestLeader_FailedMember(t *testing.T) {
})
// Should have a check
_, checks, err := state.NodeChecks(c1.config.NodeName)
_, checks, err := state.NodeChecks(nil, c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -129,7 +129,7 @@ func TestLeader_FailedMember(t *testing.T) {
}
testutil.WaitForResult(func() (bool, error) {
_, checks, err = state.NodeChecks(c1.config.NodeName)
_, checks, err = state.NodeChecks(nil, c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -489,7 +489,7 @@ func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRe
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
reply *structs.PreparedQueryExecuteResponse) error {
state := p.srv.fsm.State()
_, nodes, err := state.CheckServiceNodes(query.Service.Service)
_, nodes, err := state.CheckServiceNodes(nil, query.Service.Service)
if err != nil {
return err
}

View File

@ -499,6 +499,7 @@ func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]st
if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) {
continue
}
// List all the services on the node
services, err := tx.Get("services", "node", n.Node)
if err != nil {
@ -552,7 +553,7 @@ func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
results = append(results, service.(*structs.ServiceNode))
}
// Fill in the address details.
// Fill in the node details.
results, err = s.parseServiceNodes(tx, ws, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
@ -585,7 +586,7 @@ func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag stri
}
}
// Fill in the address details.
// Fill in the node details.
results, err = s.parseServiceNodes(tx, ws, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
@ -882,13 +883,14 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64,
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeCheck")...)
idx := maxIndexTxn(tx, "checks")
// Return the check.
check, err := tx.First("checks", "id", nodeName, string(checkID))
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
if check != nil {
return idx, check.(*structs.HealthCheck), nil
} else {
@ -898,115 +900,20 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64,
// NodeChecks is used to retrieve checks associated with the
// given node from the state store.
func (s *StateStore) NodeChecks(nodeName string) (uint64, structs.HealthChecks, error) {
func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeChecks")...)
idx := maxIndexTxn(tx, "checks")
// Return the checks.
checks, err := tx.Get("checks", "node", nodeName)
iter, err := tx.Get("checks", "node", nodeName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
ws.Add(iter.WatchCh())
// ServiceChecks is used to get all checks associated with a
// given service ID. The query is performed against a service
// _name_ instead of a service ID.
func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecks")...)
// Return the checks.
checks, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// ServiceChecksByNodeMeta is used to get all checks associated with a
// given service ID, filtered by the given node metadata values. The query
// is performed against a service _name_ instead of a service ID.
func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...)
// Return the checks.
checks, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInState")...)
// Query all checks if HealthAny is passed
if state == structs.HealthAny {
checks, err := tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// Any other state we need to query for explicitly
checks, err := tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// ChecksInStateByNodeMeta is used to query the state store for all checks
// which are in the provided state, filtered by the given node metadata values.
func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...)
// Query all checks if HealthAny is passed
var checks memdb.ResultIterator
var err error
if state == structs.HealthAny {
checks, err = tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
} else {
// Any other state we need to query for explicitly
checks, err = tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// parseChecks is a helper function used to deduplicate some
// repetitive code for returning health checks.
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
// Gather the health checks and return them properly type casted.
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
results = append(results, check.(*structs.HealthCheck))
@ -1014,20 +921,140 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64,
return idx, results, nil
}
// ServiceChecks is used to get all checks associated with a
// given service ID. The query is performed against a service
// _name_ instead of a service ID.
func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "checks")
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
results = append(results, check.(*structs.HealthCheck))
}
return idx, results, nil
}
// ServiceChecksByNodeMeta is used to get all checks associated with a
// given service ID, filtered by the given node metadata values. The query
// is performed against a service _name_ instead of a service ID.
func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "checks")
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
ws.Add(iter.WatchCh())
return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters)
}
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "checks")
// Query all checks if HealthAny is passed, otherwise use the index.
var iter memdb.ResultIterator
var err error
if state == structs.HealthAny {
iter, err = tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
} else {
iter, err = tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
}
ws.Add(iter.WatchCh())
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
results = append(results, check.(*structs.HealthCheck))
}
return idx, results, nil
}
// ChecksInStateByNodeMeta is used to query the state store for all checks
// which are in the provided state, filtered by the given node metadata values.
func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "nodes", "checks")
// Query all checks if HealthAny is passed, otherwise use the index.
var iter memdb.ResultIterator
var err error
if state == structs.HealthAny {
iter, err = tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
} else {
iter, err = tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
}
ws.Add(iter.WatchCh())
return s.parseChecksByNodeMeta(tx, ws, idx, iter, filters)
}
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn,
filters map[string]string) (uint64, structs.HealthChecks, error) {
func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
allNodesCh := allNodes.WatchCh()
// Only take results for nodes that satisfy the node metadata filters.
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
healthCheck := check.(*structs.HealthCheck)
node, err := tx.First("nodes", "id", healthCheck.Node)
watchCh, node, err := tx.FirstWatch("nodes", "id", healthCheck.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return 0, nil, ErrMissingNode
}
// Add even the filtered nodes so we wake up if the node metadata
// changes.
ws.AddWithLimit(watchLimit, watchCh, allNodesCh)
if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) {
results = append(results, healthCheck)
}
@ -1093,58 +1120,61 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
}
// CheckServiceNodes is used to query all nodes and checks for a given service.
func (s *StateStore) CheckServiceNodes(serviceName string) (uint64, structs.CheckServiceNodes, error) {
func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
idx := maxIndexTxn(tx, "nodes", "services", "checks")
// Query the state store for the service.
services, err := tx.Get("services", "service", serviceName)
iter, err := tx.Get("services", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
ws.Add(iter.WatchCh())
// Return the results.
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
for service := iter.Next(); service != nil; service = iter.Next() {
results = append(results, service.(*structs.ServiceNode))
}
return s.parseCheckServiceNodes(tx, idx, results, err)
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
}
// CheckServiceTagNodes is used to query all nodes and checks for a given
// service, filtering out services that don't contain the given tag.
func (s *StateStore) CheckServiceTagNodes(serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
idx := maxIndexTxn(tx, "nodes", "services", "checks")
// Query the state store for the service.
services, err := tx.Get("services", "service", serviceName)
iter, err := tx.Get("services", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
ws.Add(iter.WatchCh())
// Return the results, filtering by tag.
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
for service := iter.Next(); service != nil; service = iter.Next() {
svc := service.(*structs.ServiceNode)
if !serviceTagFilter(svc, tag) {
results = append(results, svc)
}
}
return s.parseCheckServiceNodes(tx, idx, results, err)
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
}
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
func (s *StateStore) parseCheckServiceNodes(
tx *memdb.Txn, idx uint64, services structs.ServiceNodes,
tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
serviceName string, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil {
return 0, nil, err
@ -1156,32 +1186,57 @@ func (s *StateStore) parseCheckServiceNodes(
return idx, nil, nil
}
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
allNodesCh := allNodes.WatchCh()
// We need a similar fallback for checks. Since services need the
// status of node + service-specific checks, we pull in a top-level
// watch over all checks.
allChecks, err := tx.Get("checks", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
}
allChecksCh := allChecks.WatchCh()
results := make(structs.CheckServiceNodes, 0, len(services))
for _, sn := range services {
// Retrieve the node.
n, err := tx.First("nodes", "id", sn.Node)
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
ws.AddWithLimit(watchLimit, watchCh, allNodesCh)
if n == nil {
return 0, nil, ErrMissingNode
}
node := n.(*structs.Node)
// We need to return the checks specific to the given service
// as well as the node itself. Unfortunately, memdb won't let
// us use the index to do the latter query so we have to pull
// them all and filter.
// First add the node-level checks. These always apply to any
// service on the node.
var checks structs.HealthChecks
iter, err := tx.Get("checks", "node", sn.Node)
iter, err := tx.Get("checks", "node_service_check", sn.Node, false)
if err != nil {
return 0, nil, err
}
ws.AddWithLimit(watchLimit, iter.WatchCh(), allChecksCh)
for check := iter.Next(); check != nil; check = iter.Next() {
hc := check.(*structs.HealthCheck)
if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID {
checks = append(checks, hc)
}
checks = append(checks, check.(*structs.HealthCheck))
}
// Now add the service-specific checks.
iter, err = tx.Get("checks", "node_service", sn.Node, sn.ServiceID)
if err != nil {
return 0, nil, err
}
ws.AddWithLimit(watchLimit, iter.WatchCh(), allChecksCh)
for check := iter.Next(); check != nil; check = iter.Next() {
checks = append(checks, check.(*structs.HealthCheck))
}
// Append to the results.

View File

@ -106,7 +106,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
// Verify that the check got registered.
verifyCheck := func() {
idx, out, err := s.NodeChecks("node1")
idx, out, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -154,7 +154,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
verifyNode()
verifyService()
{
idx, out, err := s.NodeChecks("node1")
idx, out, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -252,7 +252,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
// Verify that the check got registered.
verifyCheck := func() {
idx, out, err := s.NodeChecks("node1")
idx, out, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -290,7 +290,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
verifyNode()
verifyService()
func() {
idx, out, err := s.NodeChecks("node1")
idx, out, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1620,7 +1620,7 @@ func TestStateStore_EnsureCheck(t *testing.T) {
}
// Retrieve the check and make sure it matches
idx, checks, err := s.NodeChecks("node1")
idx, checks, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1641,7 +1641,7 @@ func TestStateStore_EnsureCheck(t *testing.T) {
}
// Check that we successfully updated
idx, checks, err = s.NodeChecks("node1")
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1681,7 +1681,7 @@ func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) {
}
// Get the check again
_, result, err := s.NodeChecks("node1")
_, result, err := s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1695,19 +1695,34 @@ func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) {
func TestStateStore_NodeChecks(t *testing.T) {
s := testStateStore(t)
// Create the first node and service with some checks
// Do an initial query for a node that doesn't exist.
ws := memdb.NewWatchSet()
idx, checks, err := s.NodeChecks(ws, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(checks) != 0 {
t.Fatalf("bad: %#v", checks)
}
// Create some nodes and checks.
testRegisterNode(t, s, 0, "node1")
testRegisterService(t, s, 1, "node1", "service1")
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
// Create a second node/service with a different set of checks
testRegisterNode(t, s, 4, "node2")
testRegisterService(t, s, 5, "node2", "service2")
testRegisterCheck(t, s, 6, "node2", "service2", "check3", structs.HealthPassing)
if !watchFired(ws) {
t.Fatalf("bad")
}
// Try querying for all checks associated with node1
idx, checks, err := s.NodeChecks("node1")
ws = memdb.NewWatchSet()
idx, checks, err = s.NodeChecks(ws, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1718,35 +1733,64 @@ func TestStateStore_NodeChecks(t *testing.T) {
t.Fatalf("bad checks: %#v", checks)
}
// Creating some unrelated node should not fire the watch.
testRegisterNode(t, s, 7, "node3")
testRegisterCheck(t, s, 8, "node3", "", "check1", structs.HealthPassing)
if watchFired(ws) {
t.Fatalf("bad")
}
// Try querying for all checks associated with node2
idx, checks, err = s.NodeChecks("node2")
ws = memdb.NewWatchSet()
idx, checks, err = s.NodeChecks(ws, "node2")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
if len(checks) != 1 || checks[0].CheckID != "check3" {
t.Fatalf("bad checks: %#v", checks)
}
// Changing node2 should fire the watch.
testRegisterCheck(t, s, 9, "node2", "service2", "check3", structs.HealthCritical)
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServiceChecks(t *testing.T) {
s := testStateStore(t)
// Create the first node and service with some checks
// Do an initial query for a service that doesn't exist.
ws := memdb.NewWatchSet()
idx, checks, err := s.ServiceChecks(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(checks) != 0 {
t.Fatalf("bad: %#v", checks)
}
// Create some nodes and checks.
testRegisterNode(t, s, 0, "node1")
testRegisterService(t, s, 1, "node1", "service1")
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
// Create a second node/service with a different set of checks
testRegisterNode(t, s, 4, "node2")
testRegisterService(t, s, 5, "node2", "service2")
testRegisterCheck(t, s, 6, "node2", "service2", "check3", structs.HealthPassing)
if !watchFired(ws) {
t.Fatalf("bad")
}
// Try querying for all checks associated with service1
idx, checks, err := s.ServiceChecks("service1")
// Try querying for all checks associated with service1.
ws = memdb.NewWatchSet()
idx, checks, err = s.ServiceChecks(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1756,21 +1800,48 @@ func TestStateStore_ServiceChecks(t *testing.T) {
if len(checks) != 2 || checks[0].CheckID != "check1" || checks[1].CheckID != "check2" {
t.Fatalf("bad checks: %#v", checks)
}
// Adding some unrelated service + check should not fire the watch.
testRegisterService(t, s, 7, "node1", "service3")
testRegisterCheck(t, s, 8, "node1", "service3", "check3", structs.HealthPassing)
if watchFired(ws) {
t.Fatalf("bad")
}
// Updating a related check should fire the watch.
testRegisterCheck(t, s, 9, "node1", "service1", "check2", structs.HealthCritical)
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Create the first node and service with some checks
// Querying with no results returns nil.
ws := memdb.NewWatchSet()
idx, checks, err := s.ServiceChecksByNodeMeta(ws, "service1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(checks) != 0 {
t.Fatalf("bad: %#v", checks)
}
// Create some nodes and checks.
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterService(t, s, 1, "node1", "service1")
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
// Create a second node/service with a different set of checks
testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"})
testRegisterService(t, s, 5, "node2", "service1")
testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing)
if !watchFired(ws) {
t.Fatalf("bad")
}
cases := []struct {
filters map[string]string
@ -1798,9 +1869,11 @@ func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
},
}
// Try querying for all checks associated with service1
// Try querying for all checks associated with service1.
idx = 7
for _, tc := range cases {
_, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters)
ws = memdb.NewWatchSet()
_, checks, err := s.ServiceChecksByNodeMeta(ws, "service1", tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1812,6 +1885,39 @@ func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
t.Fatalf("bad checks: %#v", checks)
}
}
// Registering some unrelated node should not fire the watch.
testRegisterNode(t, s, idx, fmt.Sprintf("nope%d", idx))
idx++
if watchFired(ws) {
t.Fatalf("bad")
}
}
// Overwhelm the node tracking.
for i := 0; i < 2*watchLimit; i++ {
node := fmt.Sprintf("many%d", idx)
testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"})
idx++
testRegisterService(t, s, idx, node, "service1")
idx++
testRegisterCheck(t, s, idx, node, "service1", "check1", structs.HealthPassing)
idx++
}
// Now get a fresh watch, which will be forced to watch the whole
// node table.
ws = memdb.NewWatchSet()
_, _, err = s.ServiceChecksByNodeMeta(ws, "service1",
map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
// Registering some unrelated node should now fire the watch.
testRegisterNode(t, s, idx, "nope")
if !watchFired(ws) {
t.Fatalf("bad")
}
}
@ -1819,7 +1925,8 @@ func TestStateStore_ChecksInState(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil
idx, res, err := s.ChecksInState(structs.HealthPassing)
ws := memdb.NewWatchSet()
idx, res, err := s.ChecksInState(ws, structs.HealthPassing)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
@ -1829,9 +1936,13 @@ func TestStateStore_ChecksInState(t *testing.T) {
testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical)
testRegisterCheck(t, s, 3, "node1", "", "check3", structs.HealthPassing)
if !watchFired(ws) {
t.Fatalf("bad")
}
// Query the state store for passing checks.
_, checks, err := s.ChecksInState(structs.HealthPassing)
ws = memdb.NewWatchSet()
_, checks, err := s.ChecksInState(ws, structs.HealthPassing)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1843,33 +1954,55 @@ func TestStateStore_ChecksInState(t *testing.T) {
if checks[0].CheckID != "check1" || checks[1].CheckID != "check3" {
t.Fatalf("bad: %#v", checks)
}
if watchFired(ws) {
t.Fatalf("bad")
}
// Changing the state of a check should fire the watch.
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthCritical)
if !watchFired(ws) {
t.Fatalf("bad")
}
// HealthAny just returns everything.
_, checks, err = s.ChecksInState(structs.HealthAny)
ws = memdb.NewWatchSet()
_, checks, err = s.ChecksInState(ws, structs.HealthAny)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(checks); n != 3 {
t.Fatalf("expected 3 checks, got: %d", n)
}
if watchFired(ws) {
t.Fatalf("bad")
}
// Adding a new check should fire the watch.
testRegisterCheck(t, s, 5, "node1", "", "check4", structs.HealthCritical)
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil
idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil)
// Querying with no results returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.ChecksInStateByNodeMeta(ws, structs.HealthPassing, nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register a node with checks in varied states
// Register a node with checks in varied states.
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical)
testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"})
testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing)
if !watchFired(ws) {
t.Fatalf("bad")
}
cases := []struct {
filters map[string]string
@ -1919,9 +2052,11 @@ func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
},
}
// Try querying for all checks associated with service1
// Try querying for all checks associated with service1.
idx = 5
for _, tc := range cases {
_, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters)
ws = memdb.NewWatchSet()
_, checks, err := s.ChecksInStateByNodeMeta(ws, tc.state, tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1933,23 +2068,70 @@ func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
t.Fatalf("bad checks: %#v, %v", checks, tc.checks)
}
}
// Registering some unrelated node should not fire the watch.
testRegisterNode(t, s, idx, fmt.Sprintf("nope%d", idx))
idx++
if watchFired(ws) {
t.Fatalf("bad")
}
}
// Overwhelm the node tracking.
for i := 0; i < 2*watchLimit; i++ {
node := fmt.Sprintf("many%d", idx)
testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"})
idx++
testRegisterService(t, s, idx, node, "service1")
idx++
testRegisterCheck(t, s, idx, node, "service1", "check1", structs.HealthPassing)
idx++
}
// Now get a fresh watch, which will be forced to watch the whole
// node table.
ws = memdb.NewWatchSet()
_, _, err = s.ChecksInStateByNodeMeta(ws, structs.HealthPassing,
map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
// Registering some unrelated node should now fire the watch.
testRegisterNode(t, s, idx, "nope")
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_DeleteCheck(t *testing.T) {
s := testStateStore(t)
// Register a node and a node-level health check
// Register a node and a node-level health check.
testRegisterNode(t, s, 1, "node1")
testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing)
// Delete the check
// Make sure the check is there.
ws := memdb.NewWatchSet()
_, checks, err := s.NodeChecks(ws, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 1 {
t.Fatalf("bad: %#v", checks)
}
// Delete the check.
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Check is gone
_, checks, err := s.NodeChecks("node1")
ws = memdb.NewWatchSet()
_, checks, err = s.NodeChecks(ws, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1957,50 +2139,59 @@ func TestStateStore_DeleteCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks)
}
// Index tables were updated
// Index tables were updated.
if idx := s.maxIndex("checks"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
// Deleting a nonexistent check should be idempotent and not return an
// error
// error.
if err := s.DeleteCheck(4, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("checks"); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t)
// Querying with no matches gives an empty response
idx, res, err := s.CheckServiceNodes("service1")
ws := memdb.NewWatchSet()
idx, res, err := s.CheckServiceNodes(ws, "service1")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register some nodes
// Register some nodes.
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
// Register node-level checks. These should not be returned
// in the final result.
// Register node-level checks. These should be the final result.
testRegisterCheck(t, s, 2, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node2", "", "check2", structs.HealthPassing)
// Register a service against the nodes
// Register a service against the nodes.
testRegisterService(t, s, 4, "node1", "service1")
testRegisterService(t, s, 5, "node2", "service2")
// Register checks against the services
// Register checks against the services.
testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing)
testRegisterCheck(t, s, 7, "node2", "service2", "check4", structs.HealthPassing)
// Query the state store for nodes and checks which
// have been registered with a specific service.
idx, results, err := s.CheckServiceNodes("service1")
// At this point all the changes should have fired the watch.
if !watchFired(ws) {
t.Fatalf("bad")
}
// Query the state store for nodes and checks which have been registered
// with a specific service.
ws = memdb.NewWatchSet()
idx, results, err := s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -2008,18 +2199,24 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Make sure we get the expected result (service check + node check)
// Make sure we get the expected result (service check + node check).
if n := len(results); n != 1 {
t.Fatalf("expected 1 result, got: %d", n)
}
csn := results[0]
if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 {
if csn.Node == nil || csn.Service == nil || len(csn.Checks) != 2 ||
csn.Checks[0].ServiceID != "" || csn.Checks[0].CheckID != "check1" ||
csn.Checks[1].ServiceID != "service1" || csn.Checks[1].CheckID != "check3" {
t.Fatalf("bad output: %#v", csn)
}
// Node updates alter the returned index
// Node updates alter the returned index and fire the watch.
testRegisterNode(t, s, 8, "node1")
idx, results, err = s.CheckServiceNodes("service1")
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, results, err = s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -2027,9 +2224,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Service updates alter the returned index
// Service updates alter the returned index and fire the watch.
testRegisterService(t, s, 9, "node1", "service1")
idx, results, err = s.CheckServiceNodes("service1")
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, results, err = s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -2037,15 +2238,64 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Check updates alter the returned index
// Check updates alter the returned index and fire the watch.
testRegisterCheck(t, s, 10, "node1", "service1", "check1", structs.HealthCritical)
idx, results, err = s.CheckServiceNodes("service1")
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, results, err = s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 10 {
t.Fatalf("bad index: %d", idx)
}
// Registering some unrelated node + service should not fire the watch.
testRegisterNode(t, s, 11, "nope")
testRegisterService(t, s, 12, "nope", "nope")
if watchFired(ws) {
t.Fatalf("bad")
}
// Overwhelm node and check tracking.
idx = 13
for i := 0; i < 2*watchLimit; i++ {
node := fmt.Sprintf("many%d", i)
testRegisterNode(t, s, idx, node)
idx++
testRegisterCheck(t, s, idx, node, "", "check1", structs.HealthPassing)
idx++
testRegisterService(t, s, idx, node, "service1")
idx++
testRegisterCheck(t, s, idx, node, "service1", "check2", structs.HealthPassing)
idx++
}
// Now registering an unrelated node will fire the watch.
ws = memdb.NewWatchSet()
idx, results, err = s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
testRegisterNode(t, s, idx, "more-nope")
idx++
if !watchFired(ws) {
t.Fatalf("bad")
}
// Also, registering an unrelated check will fire the watch.
ws = memdb.NewWatchSet()
idx, results, err = s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
}
testRegisterCheck(t, s, idx, "more-nope", "", "check1", structs.HealthPassing)
idx++
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func BenchmarkCheckServiceNodes(b *testing.B) {
@ -2080,8 +2330,9 @@ func BenchmarkCheckServiceNodes(b *testing.B) {
b.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
for i := 0; i < b.N; i++ {
s.CheckServiceNodes("db")
s.CheckServiceNodes(ws, "db")
}
}
@ -2114,7 +2365,8 @@ func TestStateStore_CheckServiceTagNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.CheckServiceTagNodes("db", "master")
ws := memdb.NewWatchSet()
idx, nodes, err := s.CheckServiceTagNodes(ws, "db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -2139,6 +2391,14 @@ func TestStateStore_CheckServiceTagNodes(t *testing.T) {
if nodes[0].Checks[1].CheckID != "db" {
t.Fatalf("Bad: %v", nodes[0])
}
// Changing a tag should fire the watch.
if err := s.EnsureService(4, "foo", &structs.NodeService{ID: "db1", Service: "db", Tags: []string{"nope"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_Check_Snapshot(t *testing.T) {

View File

@ -188,6 +188,22 @@ func checksTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"node_service_check": &memdb.IndexSchema{
Name: "node_service_check",
AllowMissing: true,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.FieldSetIndex{
Field: "ServiceID",
},
},
},
},
"node_service": &memdb.IndexSchema{
Name: "node_service",
AllowMissing: true,