Adds fine-grained watches to catalog endpoints.

This commit is contained in:
James Phillips 2017-01-19 23:36:50 -08:00
parent f2d9da270d
commit b7b42d718a
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
8 changed files with 369 additions and 125 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)
@ -79,7 +80,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
// Check the complete register request against the given ACL policy.
if acl != nil && c.srv.config.ACLEnforceVersion8 {
state := c.srv.fsm.State()
_, ns, err := state.NodeServices(args.Node)
_, ns, err := state.NodeServices(nil, args.Node)
if err != nil {
return fmt.Errorf("Node lookup failed: %v", err)
}
@ -164,18 +165,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Nodes"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var nodes structs.Nodes
var err error
if len(args.NodeMetaFilters) > 0 {
index, nodes, err = state.NodesByMeta(args.NodeMetaFilters)
index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
} else {
index, nodes, err = state.Nodes()
index, nodes, err = state.Nodes(ws)
}
if err != nil {
return err
@ -197,18 +197,17 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Services"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var services structs.Services
var err error
if len(args.NodeMetaFilters) > 0 {
index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters)
index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters)
} else {
index, services, err = state.Services()
index, services, err = state.Services(ws)
}
if err != nil {
return err
@ -232,18 +231,17 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(
err := c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ServiceNodes"),
func() error {
func(ws memdb.WatchSet) error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
} else {
index, services, err = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(ws, args.ServiceName)
}
if err != nil {
return err
@ -290,12 +288,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("NodeServices"),
func() error {
index, services, err := state.NodeServices(args.Node)
func(ws memdb.WatchSet) error {
index, services, err := state.NodeServices(ws, args.Node)
if err != nil {
return err
}

View File

@ -84,7 +84,7 @@ func TestFSM_RegisterNode(t *testing.T) {
}
// Verify service registered
_, services, err := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -137,7 +137,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify service registered
_, services, err := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -207,7 +207,7 @@ func TestFSM_DeregisterService(t *testing.T) {
}
// Verify service not registered
_, services, err := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -335,7 +335,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
// Verify service not registered
_, services, err := fsm.state.NodeServices("foo")
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -449,7 +449,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify the contents
_, nodes, err := fsm2.state.Nodes()
_, nodes, err := fsm2.state.Nodes(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -468,7 +468,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("bad: %v", nodes[1])
}
_, fooSrv, err := fsm2.state.NodeServices("foo")
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -287,7 +287,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
}
// Get the node services, look for ConsulServiceID
_, services, err := state.NodeServices(check.Node)
_, services, err := state.NodeServices(nil, check.Node)
if err != nil {
return err
}
@ -385,7 +385,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the associated service is available
if service != nil {
match := false
_, services, err := state.NodeServices(member.Name)
_, services, err := state.NodeServices(nil, member.Name)
if err != nil {
return err
}

View File

@ -71,7 +71,7 @@ func TestLeader_RegisterMember(t *testing.T) {
}
// Service should be registered
_, services, err := state.NodeServices(s1.config.NodeName)
_, services, err := state.NodeServices(nil, s1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -201,18 +201,19 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
}
// Nodes is used to return all of the known nodes.
func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
idx := maxIndexTxn(tx, "nodes")
// Retrieve all of the nodes
nodes, err := tx.Get("nodes", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
ws.Add(nodes.WatchCh())
// Create and return the nodes list.
var results structs.Nodes
@ -223,12 +224,12 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
}
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nodes, error) {
func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
idx := maxIndexTxn(tx, "nodes")
// Retrieve all of the nodes
var args []interface{}
@ -240,6 +241,7 @@ func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nod
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
ws.Add(nodes.WatchCh())
// Create and return the nodes list.
var results structs.Nodes
@ -422,18 +424,19 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
}
// Services returns all services along with a list of associated tags.
func (s *StateStore) Services() (uint64, structs.Services, error) {
func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Services")...)
idx := maxIndexTxn(tx, "services")
// List all the services.
services, err := tx.Get("services", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed querying services: %s", err)
}
ws.Add(services.WatchCh())
// Rip through the services and enumerate them and their unique set of
// tags.
@ -462,12 +465,12 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
}
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, structs.Services, error) {
func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
idx := maxIndexTxn(tx, "services", "nodes")
// Retrieve all of the nodes with the meta k/v pair
var args []interface{}
@ -479,6 +482,15 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
ws.Add(nodes.WatchCh())
// We don't want to track an unlimited number of services, so we pull a
// top-level watch to use as a fallback.
allServices, err := tx.Get("services", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
}
allServicesCh := allServices.WatchCh()
// Populate the services map
unique := make(map[string]map[string]struct{})
@ -492,6 +504,7 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru
if err != nil {
return 0, nil, fmt.Errorf("failed querying services: %s", err)
}
ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh)
// Rip through the services and enumerate them and their unique set of
// tags.
@ -520,25 +533,27 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru
}
// ServiceNodes returns the nodes associated with a given service name.
func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) {
func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
idx := maxIndexTxn(tx, "nodes", "services")
// List all the services.
services, err := tx.Get("services", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
ws.Add(services.WatchCh())
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
results = append(results, service.(*structs.ServiceNode))
}
// Fill in the address details.
results, err = s.parseServiceNodes(tx, results)
results, err = s.parseServiceNodes(tx, ws, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
@ -547,18 +562,19 @@ func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNo
// ServiceTagNodes returns the nodes associated with a given service, filtering
// out services that don't contain the given tag.
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes, error) {
func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
idx := maxIndexTxn(tx, "nodes", "services")
// List all the services.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
ws.Add(services.WatchCh())
// Gather all the services and apply the tag filter.
var results structs.ServiceNodes
@ -570,7 +586,7 @@ func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.Servi
}
// Fill in the address details.
results, err = s.parseServiceNodes(tx, results)
results, err = s.parseServiceNodes(tx, ws, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
@ -595,7 +611,16 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (structs.ServiceNodes, error) {
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
// We don't want to track an unlimited number of nodes, so we pull a
// top-level watch to use as a fallback.
allNodes, err := tx.Get("nodes", "id")
if err != nil {
return nil, fmt.Errorf("failed nodes lookup: %s", err)
}
allNodesCh := allNodes.WatchCh()
// Fill in the node data for each service instance.
var results structs.ServiceNodes
for _, sn := range services {
// Note that we have to clone here because we don't want to
@ -604,10 +629,11 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
s := sn.PartialClone()
// Grab the corresponding node record.
n, err := tx.First("nodes", "id", sn.Node)
watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node)
if err != nil {
return nil, fmt.Errorf("failed node lookup: %s", err)
}
ws.AddWithLimit(watchLimit, watchCh, allNodesCh)
// Populate the node-related fields. The tagged addresses may be
// used by agents to perform address translation if they are
@ -646,18 +672,19 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st
}
// NodeServices is used to query service registrations by node ID.
func (s *StateStore) NodeServices(nodeName string) (uint64, *structs.NodeServices, error) {
func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeName string) (uint64, *structs.NodeServices, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeServices")...)
idx := maxIndexTxn(tx, "nodes", "services")
// Query the node
n, err := tx.First("nodes", "id", nodeName)
watchCh, n, err := tx.FirstWatch("nodes", "id", nodeName)
if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
ws.Add(watchCh)
if n == nil {
return 0, nil, nil
}
@ -668,6 +695,7 @@ func (s *StateStore) NodeServices(nodeName string) (uint64, *structs.NodeService
if err != nil {
return 0, nil, fmt.Errorf("failed querying services for node %q: %s", nodeName, err)
}
ws.Add(services.WatchCh())
// Initialize the node services struct
ns := &structs.NodeServices{

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
)
func TestStateStore_EnsureRegistration(t *testing.T) {
@ -60,7 +61,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
// Verify that the service got registered.
verifyService := func() {
idx, out, err := s.NodeServices("node1")
idx, out, err := s.NodeServices(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -219,7 +220,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
// Verify that the service got registered.
verifyService := func() {
idx, out, err := s.NodeServices("node1")
idx, out, err := s.NodeServices(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -481,34 +482,39 @@ func TestStateStore_EnsureNode(t *testing.T) {
func TestStateStore_GetNodes(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns nil
idx, res, err := s.Nodes()
// Listing with no results returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.Nodes(ws)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create some nodes in the state store
// Create some nodes in the state store.
testRegisterNode(t, s, 0, "node0")
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
if !watchFired(ws) {
t.Fatalf("bad")
}
// Retrieve the nodes
idx, nodes, err := s.Nodes()
// Retrieve the nodes.
ws = memdb.NewWatchSet()
idx, nodes, err := s.Nodes(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
// Highest index was returned
// Highest index was returned.
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// All nodes were returned
// All nodes were returned.
if n := len(nodes); n != 3 {
t.Fatalf("bad node count: %d", n)
}
// Make sure the nodes match
// Make sure the nodes match.
for i, node := range nodes {
if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) {
t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex)
@ -518,6 +524,17 @@ func TestStateStore_GetNodes(t *testing.T) {
t.Fatalf("bad: %#v", node)
}
}
// Make sure a node delete fires the watch.
if watchFired(ws) {
t.Fatalf("bad")
}
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func BenchmarkGetNodes(b *testing.B) {
@ -533,8 +550,9 @@ func BenchmarkGetNodes(b *testing.B) {
b.Fatalf("err: %v", err)
}
ws := memdb.NewWatchSet()
for i := 0; i < b.N; i++ {
s.Nodes()
s.Nodes(ws)
}
}
@ -542,15 +560,19 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns nil
idx, res, err := s.NodesByMeta(map[string]string{"somekey": "somevalue"})
ws := memdb.NewWatchSet()
idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"})
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create some nodes in the state store
// Create some nodes in the state store.
testRegisterNodeWithMeta(t, s, 0, "node0", map[string]string{"role": "client"})
testRegisterNodeWithMeta(t, s, 1, "node1", map[string]string{"role": "client", "common": "1"})
testRegisterNodeWithMeta(t, s, 2, "node2", map[string]string{"role": "server", "common": "1"})
if !watchFired(ws) {
t.Fatalf("bad")
}
cases := []struct {
filters map[string]string
@ -579,7 +601,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
}
for _, tc := range cases {
_, result, err := s.NodesByMeta(tc.filters)
_, result, err := s.NodesByMeta(nil, tc.filters)
if err != nil {
t.Fatalf("bad: %v", err)
}
@ -594,23 +616,24 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
}
}
}
}
func BenchmarkGetNodesByMeta(b *testing.B) {
s, err := NewStateStore(nil)
// Set up a watch.
ws = memdb.NewWatchSet()
_, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"})
if err != nil {
b.Fatalf("err: %s", err)
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
b.Fatalf("err: %v", err)
}
if err := s.EnsureNode(101, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
b.Fatalf("err: %v", err)
// Make an unrelated modification and make sure the watch doesn't fire.
testRegisterNodeWithMeta(t, s, 3, "node3", map[string]string{"foo": "bar"})
if watchFired(ws) {
t.Fatalf("bad")
}
for i := 0; i < b.N; i++ {
s.Nodes()
// Change a watched key and make sure it fires.
testRegisterNodeWithMeta(t, s, 4, "node0", map[string]string{"role": "different"})
if !watchFired(ws) {
t.Fatalf("bad")
}
}
@ -766,13 +789,14 @@ func TestStateStore_Node_Watches(t *testing.T) {
func TestStateStore_EnsureService(t *testing.T) {
s := testStateStore(t)
// Fetching services for a node with none returns nil
idx, res, err := s.NodeServices("node1")
// Fetching services for a node with none returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.NodeServices(ws, "node1")
if err != nil || res != nil || idx != 0 {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create the service registration
// Create the service registration.
ns1 := &structs.NodeService{
ID: "service1",
Service: "redis",
@ -781,21 +805,35 @@ func TestStateStore_EnsureService(t *testing.T) {
Port: 1111,
}
// Creating a service without a node returns an error
// Creating a service without a node returns an error.
if err := s.EnsureService(1, "node1", ns1); err != ErrMissingNode {
t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err)
}
if watchFired(ws) {
t.Fatalf("bad")
}
// Register the nodes
// Register the nodes.
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
if !watchFired(ws) {
t.Fatalf("bad")
}
// Service successfully registers into the state store
// Service successfully registers into the state store.
ws = memdb.NewWatchSet()
_, _, err = s.NodeServices(ws, "node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if err = s.EnsureService(10, "node1", ns1); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Register a similar service against both nodes
// Register a similar service against both nodes.
ns2 := *ns1
ns2.ID = "service2"
for _, n := range []string{"node1", "node2"} {
@ -804,15 +842,24 @@ func TestStateStore_EnsureService(t *testing.T) {
}
}
// Register a different service on the bad node
// Register a different service on the bad node.
ws = memdb.NewWatchSet()
_, _, err = s.NodeServices(ws, "node1")
if err != nil {
t.Fatalf("err: %v", err)
}
ns3 := *ns1
ns3.ID = "service3"
if err := s.EnsureService(30, "node2", &ns3); err != nil {
t.Fatalf("err: %s", err)
}
if watchFired(ws) {
t.Fatalf("bad")
}
// Retrieve the services
idx, out, err := s.NodeServices("node1")
// Retrieve the services.
ws = memdb.NewWatchSet()
idx, out, err := s.NodeServices(ws, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -820,12 +867,12 @@ func TestStateStore_EnsureService(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Only the services for the requested node are returned
// Only the services for the requested node are returned.
if out == nil || len(out.Services) != 2 {
t.Fatalf("bad services: %#v", out)
}
// Results match the inserted services and have the proper indexes set
// Results match the inserted services and have the proper indexes set.
expect1 := *ns1
expect1.CreateIndex, expect1.ModifyIndex = 10, 10
if svc := out.Services["service1"]; !reflect.DeepEqual(&expect1, svc) {
@ -838,19 +885,22 @@ func TestStateStore_EnsureService(t *testing.T) {
t.Fatalf("bad: %#v %#v", ns2, svc)
}
// Index tables were updated
// Index tables were updated.
if idx := s.maxIndex("services"); idx != 30 {
t.Fatalf("bad index: %d", idx)
}
// Update a service registration
// Update a service registration.
ns1.Address = "1.1.1.2"
if err := s.EnsureService(40, "node1", ns1); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Retrieve the service again and ensure it matches
idx, out, err = s.NodeServices("node1")
// Retrieve the service again and ensure it matches..
idx, out, err = s.NodeServices(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -866,7 +916,7 @@ func TestStateStore_EnsureService(t *testing.T) {
t.Fatalf("bad: %#v", svc)
}
// Index tables were updated
// Index tables were updated.
if idx := s.maxIndex("services"); idx != 40 {
t.Fatalf("bad index: %d", idx)
}
@ -875,6 +925,19 @@ func TestStateStore_EnsureService(t *testing.T) {
func TestStateStore_Services(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, services, err := s.Services(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(services) != 0 {
t.Fatalf("bad: %v", services)
}
// Register several nodes and services.
testRegisterNode(t, s, 1, "node1")
ns1 := &structs.NodeService{
@ -899,9 +962,13 @@ func TestStateStore_Services(t *testing.T) {
if err := s.EnsureService(5, "node2", ns2); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Pull all the services.
idx, services, err := s.Services()
ws = memdb.NewWatchSet()
idx, services, err = s.Services(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -922,18 +989,27 @@ func TestStateStore_Services(t *testing.T) {
if !reflect.DeepEqual(expected, services) {
t.Fatalf("bad: %#v", services)
}
// Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServicesByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns nil
idx, res, err := s.ServicesByNodeMeta(map[string]string{"somekey": "somevalue"})
// Listing with no results returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.ServicesByNodeMeta(ws, map[string]string{"somekey": "somevalue"})
if idx != 0 || len(res) != 0 || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Create some nodes and services in the state store
// Create some nodes and services in the state store.
node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}}
if err := s.EnsureNode(0, node0); err != nil {
t.Fatalf("err: %v", err)
@ -962,9 +1038,13 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err := s.EnsureService(3, "node1", ns2); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Filter the services by the first node's meta value
_, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"})
// Filter the services by the first node's meta value.
ws = memdb.NewWatchSet()
_, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client"})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -977,7 +1057,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
}
// Get all services using the common meta value
_, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"})
_, res, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -990,7 +1070,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
}
// Get an empty list for an invalid meta value
_, res, err = s.ServicesByNodeMeta(map[string]string{"invalid": "nope"})
_, res, err = s.ServicesByNodeMeta(ws, map[string]string{"invalid": "nope"})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1000,7 +1080,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
}
// Get the first node's service instance using multiple meta filters
_, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client", "common": "1"})
_, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client", "common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1011,45 +1091,94 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if !reflect.DeepEqual(res, expected) {
t.Fatalf("bad: %v %v", res, expected)
}
// Sanity check the watch before we proceed.
if watchFired(ws) {
t.Fatalf("bad")
}
// Registering some unrelated node + service should not fire the watch.
testRegisterNode(t, s, 4, "nope")
testRegisterService(t, s, 5, "nope", "nope")
if watchFired(ws) {
t.Fatalf("bad")
}
// Overwhelm the service tracking.
idx = 6
for i := 0; i < 2*watchLimit; i++ {
node := fmt.Sprintf("many%d", i)
testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"})
idx++
testRegisterService(t, s, idx, node, "nope")
idx++
}
// Now get a fresh watch, which will be forced to watch the whole
// service table.
ws = memdb.NewWatchSet()
_, _, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"})
if err != nil {
t.Fatalf("err: %s", err)
}
// Registering some unrelated node + service should not fire the watch.
testRegisterService(t, s, idx, "nope", "more-nope")
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServiceNodes(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.ServiceNodes(ws, "db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(nodes) != 0 {
t.Fatalf("bad: %v", nodes)
}
// Create some nodes and services.
if err := s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(14, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(15, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
idx, nodes, err := s.ServiceNodes("db")
// Read everything back.
ws = memdb.NewWatchSet()
idx, nodes, err = s.ServiceNodes(ws, "db")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 16 {
t.Fatalf("bad: %v", 16)
t.Fatalf("bad: %d", idx)
}
if len(nodes) != 3 {
t.Fatalf("bad: %v", nodes)
@ -1069,7 +1198,6 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].Node != "bar" {
t.Fatalf("bad: %v", nodes)
}
@ -1085,7 +1213,6 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[1].ServicePort != 8001 {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
@ -1101,32 +1228,88 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[2].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
// Registering some unrelated node should not fire the watch.
testRegisterNode(t, s, 17, "nope")
if watchFired(ws) {
t.Fatalf("bad")
}
// But removing a node with the "db" service should fire the watch.
if err := s.DeleteNode(18, "bar"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Overwhelm the node tracking.
idx = 19
for i := 0; i < 2*watchLimit; i++ {
node := fmt.Sprintf("many%d", i)
if err := s.EnsureNode(idx, &structs.Node{Node: node, Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(idx, node, &structs.NodeService{ID: "db", Service: "db", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
idx++
}
// Now get a fresh watch, which will be forced to watch the whole nodes
// table.
ws = memdb.NewWatchSet()
_, _, err = s.ServiceNodes(ws, "db")
if err != nil {
t.Fatalf("err: %s", err)
}
// Registering some unrelated node should fire the watch now.
testRegisterNode(t, s, idx, "more-nope")
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServiceTagNodes(t *testing.T) {
s := testStateStore(t)
// Listing with no results returns an empty list.
ws := memdb.NewWatchSet()
idx, nodes, err := s.ServiceTagNodes(ws, "db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad: %d", idx)
}
if len(nodes) != 0 {
t.Fatalf("bad: %v", nodes)
}
// Create some nodes and services.
if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
idx, nodes, err := s.ServiceTagNodes("db", "master")
// Read everything back.
ws = memdb.NewWatchSet()
idx, nodes, err = s.ServiceTagNodes(ws, "db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1148,6 +1331,20 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
// Registering some unrelated node should not fire the watch.
testRegisterNode(t, s, 20, "nope")
if watchFired(ws) {
t.Fatalf("bad")
}
// But removing a node with the "db:master" service should fire the watch.
if err := s.DeleteNode(21, "foo"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
@ -1173,7 +1370,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("err: %v", err)
}
idx, nodes, err := s.ServiceTagNodes("db", "master")
idx, nodes, err := s.ServiceTagNodes(nil, "db", "master")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1196,7 +1393,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("bad: %v", nodes)
}
idx, nodes, err = s.ServiceTagNodes("db", "v2")
idx, nodes, err = s.ServiceTagNodes(nil, "db", "v2")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1207,7 +1404,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
t.Fatalf("bad: %v", nodes)
}
idx, nodes, err = s.ServiceTagNodes("db", "dev")
idx, nodes, err = s.ServiceTagNodes(nil, "db", "dev")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1234,18 +1431,24 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
func TestStateStore_DeleteService(t *testing.T) {
s := testStateStore(t)
// Register a node with one service and a check
// Register a node with one service and a check.
testRegisterNode(t, s, 1, "node1")
testRegisterService(t, s, 2, "node1", "service1")
testRegisterCheck(t, s, 3, "node1", "service1", "check1", structs.HealthPassing)
// Delete the service
// Delete the service.
ws := memdb.NewWatchSet()
_, _, err := s.NodeServices(ws, "node1")
if err := s.DeleteService(4, "node1", "service1"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Service doesn't exist.
_, ns, err := s.NodeServices("node1")
ws = memdb.NewWatchSet()
_, ns, err := s.NodeServices(ws, "node1")
if err != nil || ns == nil || len(ns.Services) != 0 {
t.Fatalf("bad: %#v (err: %#v)", ns, err)
}
@ -1259,7 +1462,7 @@ func TestStateStore_DeleteService(t *testing.T) {
t.Fatalf("bad: %#v (err: %s)", check, err)
}
// Index tables were updated
// Index tables were updated.
if idx := s.maxIndex("services"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
@ -1268,13 +1471,16 @@ func TestStateStore_DeleteService(t *testing.T) {
}
// Deleting a nonexistent service should be idempotent and not return an
// error
// error, nor fire a watch.
if err := s.DeleteService(5, "node1", "service1"); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("services"); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}
func TestStateStore_Service_Snapshot(t *testing.T) {

View File

@ -30,6 +30,18 @@ var (
ErrMissingQueryID = errors.New("Missing Query ID")
)
const (
// watchLimit is used as a soft limit to cap how many watches we allow
// for a given blocking query. If this is exceeded, then we will use a
// higher-level watch that's less fine-grained. This isn't as bad as it
// seems since we have made the main culprits (nodes and services) more
// efficient by diffing before we update via register requests.
//
// Given the current size of aFew == 32 in memdb's watch_few.go, this
// will allow for up to ~64 goroutines per blocking query.
watchLimit = 2048
)
// StateStore is where we store all of Consul's state, including
// records of node registrations, services, checks, key/value
// pairs and more. The DB is entirely in-memory and is constructed

View File

@ -126,10 +126,11 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// 1 ms timeout since we already expect the event happened before calling
// this and just need to distinguish a fire from a timeout.
// timeout since we already expect the event happened before calling this and
// just need to distinguish a fire from a timeout. We do need a little time to
// allow the watch to set up any goroutines, though.
func watchFired(ws memdb.WatchSet) bool {
timedOut := ws.Watch(time.After(1 * time.Millisecond))
timedOut := ws.Watch(time.After(50 * time.Millisecond))
return !timedOut
}