mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
Adds fine-grained watches to internal endpoints.
This commit is contained in:
parent
3675e5ceba
commit
dfcffe097c
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
@ -23,12 +24,11 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
|
||||
|
||||
// Get the node info
|
||||
state := m.srv.fsm.State()
|
||||
return m.srv.blockingRPC(
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("NodeInfo"),
|
||||
func() error {
|
||||
index, dump, err := state.NodeInfo(args.Node)
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, dump, err := state.NodeInfo(ws, args.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -47,12 +47,11 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
|
||||
|
||||
// Get all the node info
|
||||
state := m.srv.fsm.State()
|
||||
return m.srv.blockingRPC(
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("NodeDump"),
|
||||
func() error {
|
||||
index, dump, err := state.NodeDump()
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, dump, err := state.NodeDump(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1252,25 +1252,26 @@ func (s *StateStore) parseCheckServiceNodes(
|
||||
|
||||
// NodeInfo is used to generate a dump of a single node. The dump includes
|
||||
// all services and checks which are registered against the node.
|
||||
func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump, error) {
|
||||
func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("NodeInfo")...)
|
||||
idx := maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
|
||||
// Query the node by the passed node
|
||||
nodes, err := tx.Get("nodes", "id", node)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
return s.parseNodes(tx, idx, nodes)
|
||||
ws.Add(nodes.WatchCh())
|
||||
return s.parseNodes(tx, ws, idx, nodes)
|
||||
}
|
||||
|
||||
// NodeDump is used to generate a dump of all nodes. This call is expensive
|
||||
// as it has to query every node, service, and check. The response can also
|
||||
// be quite large since there is currently no filtering applied.
|
||||
func (s *StateStore) NodeDump() (uint64, structs.NodeDump, error) {
|
||||
func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
@ -1282,15 +1283,31 @@ func (s *StateStore) NodeDump() (uint64, structs.NodeDump, error) {
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
return s.parseNodes(tx, idx, nodes)
|
||||
ws.Add(nodes.WatchCh())
|
||||
return s.parseNodes(tx, ws, idx, nodes)
|
||||
}
|
||||
|
||||
// parseNodes takes an iterator over a set of nodes and returns a struct
|
||||
// containing the nodes along with all of their associated services
|
||||
// and/or health checks.
|
||||
func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
||||
func (s *StateStore) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
|
||||
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
|
||||
|
||||
// We don't want to track an unlimited number of services, so we pull a
|
||||
// top-level watch to use as a fallback.
|
||||
allServices, err := tx.Get("services", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
allServicesCh := allServices.WatchCh()
|
||||
|
||||
// We need a similar fallback for checks.
|
||||
allChecks, err := tx.Get("checks", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
|
||||
}
|
||||
allChecksCh := allChecks.WatchCh()
|
||||
|
||||
var results structs.NodeDump
|
||||
for n := iter.Next(); n != nil; n = iter.Next() {
|
||||
node := n.(*structs.Node)
|
||||
@ -1309,6 +1326,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh)
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
ns := service.(*structs.ServiceNode).ToNodeService()
|
||||
dump.Services = append(dump.Services, ns)
|
||||
@ -1319,6 +1337,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
ws.AddWithLimit(watchLimit, checks.WatchCh(), allChecksCh)
|
||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||
hc := check.(*structs.HealthCheck)
|
||||
dump.Checks = append(dump.Checks, hc)
|
||||
|
@ -2499,11 +2499,13 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Generating a node dump that matches nothing returns empty
|
||||
idx, dump, err := s.NodeInfo("node1")
|
||||
wsInfo := memdb.NewWatchSet()
|
||||
idx, dump, err := s.NodeInfo(wsInfo, "node1")
|
||||
if idx != 0 || dump != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, dump, err)
|
||||
}
|
||||
idx, dump, err = s.NodeDump()
|
||||
wsDump := memdb.NewWatchSet()
|
||||
idx, dump, err = s.NodeDump(wsDump)
|
||||
if idx != 0 || dump != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, dump, err)
|
||||
}
|
||||
@ -2526,6 +2528,14 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
testRegisterCheck(t, s, 8, "node1", "", "check2", structs.HealthPassing)
|
||||
testRegisterCheck(t, s, 9, "node2", "", "check2", structs.HealthPassing)
|
||||
|
||||
// Both watches should have fired due to the changes above.
|
||||
if !watchFired(wsInfo) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if !watchFired(wsDump) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Check that our result matches what we expect.
|
||||
expect := structs.NodeDump{
|
||||
&structs.NodeInfo{
|
||||
@ -2629,7 +2639,8 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
}
|
||||
|
||||
// Get a dump of just a single node
|
||||
idx, dump, err = s.NodeInfo("node1")
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, dump, err = s.NodeInfo(ws, "node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -2641,7 +2652,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
}
|
||||
|
||||
// Generate a dump of all the nodes
|
||||
idx, dump, err = s.NodeDump()
|
||||
idx, dump, err = s.NodeDump(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -2651,4 +2662,12 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
|
||||
if !reflect.DeepEqual(dump, expect) {
|
||||
t.Fatalf("bad: %#v", dump[0].Services[0])
|
||||
}
|
||||
|
||||
// Registering some unrelated node + service + check should not fire the
|
||||
// watch.
|
||||
testRegisterNode(t, s, 10, "nope")
|
||||
testRegisterService(t, s, 11, "nope", "nope")
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user