diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 8ee16c2a48..dfa71f2608 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" ) @@ -79,7 +80,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Check the complete register request against the given ACL policy. if acl != nil && c.srv.config.ACLEnforceVersion8 { state := c.srv.fsm.State() - _, ns, err := state.NodeServices(args.Node) + _, ns, err := state.NodeServices(nil, args.Node) if err != nil { return fmt.Errorf("Node lookup failed: %v", err) } @@ -164,18 +165,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde // Get the list of nodes. state := c.srv.fsm.State() - return c.srv.blockingRPC( + return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("Nodes"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var nodes structs.Nodes var err error if len(args.NodeMetaFilters) > 0 { - index, nodes, err = state.NodesByMeta(args.NodeMetaFilters) + index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters) } else { - index, nodes, err = state.Nodes() + index, nodes, err = state.Nodes(ws) } if err != nil { return err @@ -197,18 +197,17 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I // Get the list of services and their tags. state := c.srv.fsm.State() - return c.srv.blockingRPC( + return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("Services"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var services structs.Services var err error if len(args.NodeMetaFilters) > 0 { - index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters) + index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters) } else { - index, services, err = state.Services() + index, services, err = state.Services(ws) } if err != nil { return err @@ -232,18 +231,17 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() - err := c.srv.blockingRPC( + err := c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("ServiceNodes"), - func() error { + func(ws memdb.WatchSet) error { var index uint64 var services structs.ServiceNodes var err error if args.TagFilter { - index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) + index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag) } else { - index, services, err = state.ServiceNodes(args.ServiceName) + index, services, err = state.ServiceNodes(ws, args.ServiceName) } if err != nil { return err @@ -290,12 +288,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() - return c.srv.blockingRPC( + return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("NodeServices"), - func() error { - index, services, err := state.NodeServices(args.Node) + func(ws memdb.WatchSet) error { + index, services, err := state.NodeServices(ws, args.Node) if err != nil { return err } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 2f63fd89ac..496416d37f 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -84,7 +84,7 @@ func TestFSM_RegisterNode(t *testing.T) { } // Verify service registered - _, services, err := fsm.state.NodeServices("foo") + _, services, err := fsm.state.NodeServices(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -137,7 +137,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { } // Verify service registered - _, services, err := fsm.state.NodeServices("foo") + _, services, err := fsm.state.NodeServices(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -207,7 +207,7 @@ func TestFSM_DeregisterService(t *testing.T) { } // Verify service not registered - _, services, err := fsm.state.NodeServices("foo") + _, services, err := fsm.state.NodeServices(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -335,7 +335,7 @@ func TestFSM_DeregisterNode(t *testing.T) { } // Verify service not registered - _, services, err := fsm.state.NodeServices("foo") + _, services, err := fsm.state.NodeServices(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } @@ -449,7 +449,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Verify the contents - _, nodes, err := fsm2.state.Nodes() + _, nodes, err := fsm2.state.Nodes(nil) if err != nil { t.Fatalf("err: %s", err) } @@ -468,7 +468,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { t.Fatalf("bad: %v", nodes[1]) } - _, fooSrv, err := fsm2.state.NodeServices("foo") + _, fooSrv, err := fsm2.state.NodeServices(nil, "foo") if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/leader.go b/consul/leader.go index 4efededfee..93505753ac 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -287,7 +287,7 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { } // Get the node services, look for ConsulServiceID - _, services, err := state.NodeServices(check.Node) + _, services, err := state.NodeServices(nil, check.Node) if err != nil { return err } @@ -385,7 +385,7 @@ func (s *Server) handleAliveMember(member serf.Member) error { // Check if the associated service is available if service != nil { match := false - _, services, err := state.NodeServices(member.Name) + _, services, err := state.NodeServices(nil, member.Name) if err != nil { return err } diff --git a/consul/leader_test.go b/consul/leader_test.go index 6e0f6d5f33..3ff724090f 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -71,7 +71,7 @@ func TestLeader_RegisterMember(t *testing.T) { } // Service should be registered - _, services, err := state.NodeServices(s1.config.NodeName) + _, services, err := state.NodeServices(nil, s1.config.NodeName) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/state/catalog.go b/consul/state/catalog.go index 7768e89390..3c56382430 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -201,18 +201,19 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { +func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) + idx := maxIndexTxn(tx, "nodes") // Retrieve all of the nodes nodes, err := tx.Get("nodes", "id") if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } + ws.Add(nodes.WatchCh()) // Create and return the nodes list. var results structs.Nodes @@ -223,12 +224,12 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) { } // NodesByMeta is used to return all nodes with the given metadata key/value pairs. -func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nodes, error) { +func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...) + idx := maxIndexTxn(tx, "nodes") // Retrieve all of the nodes var args []interface{} @@ -240,6 +241,7 @@ func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nod if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } + ws.Add(nodes.WatchCh()) // Create and return the nodes list. var results structs.Nodes @@ -422,18 +424,19 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa } // Services returns all services along with a list of associated tags. -func (s *StateStore) Services() (uint64, structs.Services, error) { +func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("Services")...) + idx := maxIndexTxn(tx, "services") // List all the services. services, err := tx.Get("services", "id") if err != nil { return 0, nil, fmt.Errorf("failed querying services: %s", err) } + ws.Add(services.WatchCh()) // Rip through the services and enumerate them and their unique set of // tags. @@ -462,12 +465,12 @@ func (s *StateStore) Services() (uint64, structs.Services, error) { } // ServicesByNodeMeta returns all services, filtered by the given node metadata. -func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, structs.Services, error) { +func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) + idx := maxIndexTxn(tx, "services", "nodes") // Retrieve all of the nodes with the meta k/v pair var args []interface{} @@ -479,6 +482,15 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru if err != nil { return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) } + ws.Add(nodes.WatchCh()) + + // We don't want to track an unlimited number of services, so we pull a + // top-level watch to use as a fallback. + allServices, err := tx.Get("services", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed services lookup: %s", err) + } + allServicesCh := allServices.WatchCh() // Populate the services map unique := make(map[string]map[string]struct{}) @@ -492,6 +504,7 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru if err != nil { return 0, nil, fmt.Errorf("failed querying services: %s", err) } + ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh) // Rip through the services and enumerate them and their unique set of // tags. @@ -520,25 +533,27 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru } // ServiceNodes returns the nodes associated with a given service name. -func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) { +func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) + idx := maxIndexTxn(tx, "nodes", "services") // List all the services. services, err := tx.Get("services", "service", serviceName) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } + ws.Add(services.WatchCh()) + var results structs.ServiceNodes for service := services.Next(); service != nil; service = services.Next() { results = append(results, service.(*structs.ServiceNode)) } // Fill in the address details. - results, err = s.parseServiceNodes(tx, results) + results, err = s.parseServiceNodes(tx, ws, results) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } @@ -547,18 +562,19 @@ func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNo // ServiceTagNodes returns the nodes associated with a given service, filtering // out services that don't contain the given tag. -func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes, error) { +func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) + idx := maxIndexTxn(tx, "nodes", "services") // List all the services. services, err := tx.Get("services", "service", service) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } + ws.Add(services.WatchCh()) // Gather all the services and apply the tag filter. var results structs.ServiceNodes @@ -570,7 +586,7 @@ func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.Servi } // Fill in the address details. - results, err = s.parseServiceNodes(tx, results) + results, err = s.parseServiceNodes(tx, ws, results) if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } @@ -595,7 +611,16 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool { // parseServiceNodes iterates over a services query and fills in the node details, // returning a ServiceNodes slice. -func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (structs.ServiceNodes, error) { +func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { + // We don't want to track an unlimited number of nodes, so we pull a + // top-level watch to use as a fallback. + allNodes, err := tx.Get("nodes", "id") + if err != nil { + return nil, fmt.Errorf("failed nodes lookup: %s", err) + } + allNodesCh := allNodes.WatchCh() + + // Fill in the node data for each service instance. var results structs.ServiceNodes for _, sn := range services { // Note that we have to clone here because we don't want to @@ -604,10 +629,11 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo s := sn.PartialClone() // Grab the corresponding node record. - n, err := tx.First("nodes", "id", sn.Node) + watchCh, n, err := tx.FirstWatch("nodes", "id", sn.Node) if err != nil { return nil, fmt.Errorf("failed node lookup: %s", err) } + ws.AddWithLimit(watchLimit, watchCh, allNodesCh) // Populate the node-related fields. The tagged addresses may be // used by agents to perform address translation if they are @@ -646,18 +672,19 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st } // NodeServices is used to query service registrations by node ID. -func (s *StateStore) NodeServices(nodeName string) (uint64, *structs.NodeServices, error) { +func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeName string) (uint64, *structs.NodeServices, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("NodeServices")...) + idx := maxIndexTxn(tx, "nodes", "services") // Query the node - n, err := tx.First("nodes", "id", nodeName) + watchCh, n, err := tx.FirstWatch("nodes", "id", nodeName) if err != nil { return 0, nil, fmt.Errorf("node lookup failed: %s", err) } + ws.Add(watchCh) if n == nil { return 0, nil, nil } @@ -668,6 +695,7 @@ func (s *StateStore) NodeServices(nodeName string) (uint64, *structs.NodeService if err != nil { return 0, nil, fmt.Errorf("failed querying services for node %q: %s", nodeName, err) } + ws.Add(services.WatchCh()) // Initialize the node services struct ns := &structs.NodeServices{ diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index 957f09eb04..f4793efe38 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-memdb" ) func TestStateStore_EnsureRegistration(t *testing.T) { @@ -60,7 +61,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) { // Verify that the service got registered. verifyService := func() { - idx, out, err := s.NodeServices("node1") + idx, out, err := s.NodeServices(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -219,7 +220,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) { // Verify that the service got registered. verifyService := func() { - idx, out, err := s.NodeServices("node1") + idx, out, err := s.NodeServices(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -481,34 +482,39 @@ func TestStateStore_EnsureNode(t *testing.T) { func TestStateStore_GetNodes(t *testing.T) { s := testStateStore(t) - // Listing with no results returns nil - idx, res, err := s.Nodes() + // Listing with no results returns nil. + ws := memdb.NewWatchSet() + idx, res, err := s.Nodes(ws) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Create some nodes in the state store + // Create some nodes in the state store. testRegisterNode(t, s, 0, "node0") testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 2, "node2") + if !watchFired(ws) { + t.Fatalf("bad") + } - // Retrieve the nodes - idx, nodes, err := s.Nodes() + // Retrieve the nodes. + ws = memdb.NewWatchSet() + idx, nodes, err := s.Nodes(ws) if err != nil { t.Fatalf("err: %s", err) } - // Highest index was returned + // Highest index was returned. if idx != 2 { t.Fatalf("bad index: %d", idx) } - // All nodes were returned + // All nodes were returned. if n := len(nodes); n != 3 { t.Fatalf("bad node count: %d", n) } - // Make sure the nodes match + // Make sure the nodes match. for i, node := range nodes { if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) { t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex) @@ -518,6 +524,17 @@ func TestStateStore_GetNodes(t *testing.T) { t.Fatalf("bad: %#v", node) } } + + // Make sure a node delete fires the watch. + if watchFired(ws) { + t.Fatalf("bad") + } + if err := s.DeleteNode(3, "node1"); err != nil { + t.Fatalf("err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("bad") + } } func BenchmarkGetNodes(b *testing.B) { @@ -533,8 +550,9 @@ func BenchmarkGetNodes(b *testing.B) { b.Fatalf("err: %v", err) } + ws := memdb.NewWatchSet() for i := 0; i < b.N; i++ { - s.Nodes() + s.Nodes(ws) } } @@ -542,15 +560,19 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { s := testStateStore(t) // Listing with no results returns nil - idx, res, err := s.NodesByMeta(map[string]string{"somekey": "somevalue"}) + ws := memdb.NewWatchSet() + idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"}) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Create some nodes in the state store + // Create some nodes in the state store. testRegisterNodeWithMeta(t, s, 0, "node0", map[string]string{"role": "client"}) testRegisterNodeWithMeta(t, s, 1, "node1", map[string]string{"role": "client", "common": "1"}) testRegisterNodeWithMeta(t, s, 2, "node2", map[string]string{"role": "server", "common": "1"}) + if !watchFired(ws) { + t.Fatalf("bad") + } cases := []struct { filters map[string]string @@ -579,7 +601,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } for _, tc := range cases { - _, result, err := s.NodesByMeta(tc.filters) + _, result, err := s.NodesByMeta(nil, tc.filters) if err != nil { t.Fatalf("bad: %v", err) } @@ -594,23 +616,24 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } } } -} -func BenchmarkGetNodesByMeta(b *testing.B) { - s, err := NewStateStore(nil) + // Set up a watch. + ws = memdb.NewWatchSet() + _, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"}) if err != nil { - b.Fatalf("err: %s", err) + t.Fatalf("err: %v", err) } - if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { - b.Fatalf("err: %v", err) - } - if err := s.EnsureNode(101, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { - b.Fatalf("err: %v", err) + // Make an unrelated modification and make sure the watch doesn't fire. + testRegisterNodeWithMeta(t, s, 3, "node3", map[string]string{"foo": "bar"}) + if watchFired(ws) { + t.Fatalf("bad") } - for i := 0; i < b.N; i++ { - s.Nodes() + // Change a watched key and make sure it fires. + testRegisterNodeWithMeta(t, s, 4, "node0", map[string]string{"role": "different"}) + if !watchFired(ws) { + t.Fatalf("bad") } } @@ -766,13 +789,14 @@ func TestStateStore_Node_Watches(t *testing.T) { func TestStateStore_EnsureService(t *testing.T) { s := testStateStore(t) - // Fetching services for a node with none returns nil - idx, res, err := s.NodeServices("node1") + // Fetching services for a node with none returns nil. + ws := memdb.NewWatchSet() + idx, res, err := s.NodeServices(ws, "node1") if err != nil || res != nil || idx != 0 { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Create the service registration + // Create the service registration. ns1 := &structs.NodeService{ ID: "service1", Service: "redis", @@ -781,21 +805,35 @@ func TestStateStore_EnsureService(t *testing.T) { Port: 1111, } - // Creating a service without a node returns an error + // Creating a service without a node returns an error. if err := s.EnsureService(1, "node1", ns1); err != ErrMissingNode { t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err) } + if watchFired(ws) { + t.Fatalf("bad") + } - // Register the nodes + // Register the nodes. testRegisterNode(t, s, 0, "node1") testRegisterNode(t, s, 1, "node2") + if !watchFired(ws) { + t.Fatalf("bad") + } - // Service successfully registers into the state store + // Service successfully registers into the state store. + ws = memdb.NewWatchSet() + _, _, err = s.NodeServices(ws, "node1") + if err != nil { + t.Fatalf("err: %v", err) + } if err = s.EnsureService(10, "node1", ns1); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } - // Register a similar service against both nodes + // Register a similar service against both nodes. ns2 := *ns1 ns2.ID = "service2" for _, n := range []string{"node1", "node2"} { @@ -804,15 +842,24 @@ func TestStateStore_EnsureService(t *testing.T) { } } - // Register a different service on the bad node + // Register a different service on the bad node. + ws = memdb.NewWatchSet() + _, _, err = s.NodeServices(ws, "node1") + if err != nil { + t.Fatalf("err: %v", err) + } ns3 := *ns1 ns3.ID = "service3" if err := s.EnsureService(30, "node2", &ns3); err != nil { t.Fatalf("err: %s", err) } + if watchFired(ws) { + t.Fatalf("bad") + } - // Retrieve the services - idx, out, err := s.NodeServices("node1") + // Retrieve the services. + ws = memdb.NewWatchSet() + idx, out, err := s.NodeServices(ws, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -820,12 +867,12 @@ func TestStateStore_EnsureService(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Only the services for the requested node are returned + // Only the services for the requested node are returned. if out == nil || len(out.Services) != 2 { t.Fatalf("bad services: %#v", out) } - // Results match the inserted services and have the proper indexes set + // Results match the inserted services and have the proper indexes set. expect1 := *ns1 expect1.CreateIndex, expect1.ModifyIndex = 10, 10 if svc := out.Services["service1"]; !reflect.DeepEqual(&expect1, svc) { @@ -838,19 +885,22 @@ func TestStateStore_EnsureService(t *testing.T) { t.Fatalf("bad: %#v %#v", ns2, svc) } - // Index tables were updated + // Index tables were updated. if idx := s.maxIndex("services"); idx != 30 { t.Fatalf("bad index: %d", idx) } - // Update a service registration + // Update a service registration. ns1.Address = "1.1.1.2" if err := s.EnsureService(40, "node1", ns1); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } - // Retrieve the service again and ensure it matches - idx, out, err = s.NodeServices("node1") + // Retrieve the service again and ensure it matches.. + idx, out, err = s.NodeServices(nil, "node1") if err != nil { t.Fatalf("err: %s", err) } @@ -866,7 +916,7 @@ func TestStateStore_EnsureService(t *testing.T) { t.Fatalf("bad: %#v", svc) } - // Index tables were updated + // Index tables were updated. if idx := s.maxIndex("services"); idx != 40 { t.Fatalf("bad index: %d", idx) } @@ -875,6 +925,19 @@ func TestStateStore_EnsureService(t *testing.T) { func TestStateStore_Services(t *testing.T) { s := testStateStore(t) + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, services, err := s.Services(ws) + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(services) != 0 { + t.Fatalf("bad: %v", services) + } + // Register several nodes and services. testRegisterNode(t, s, 1, "node1") ns1 := &structs.NodeService{ @@ -899,9 +962,13 @@ func TestStateStore_Services(t *testing.T) { if err := s.EnsureService(5, "node2", ns2); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Pull all the services. - idx, services, err := s.Services() + ws = memdb.NewWatchSet() + idx, services, err = s.Services(ws) if err != nil { t.Fatalf("err: %s", err) } @@ -922,18 +989,27 @@ func TestStateStore_Services(t *testing.T) { if !reflect.DeepEqual(expected, services) { t.Fatalf("bad: %#v", services) } + + // Deleting a node with a service should fire the watch. + if err := s.DeleteNode(6, "node1"); err != nil { + t.Fatalf("err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServicesByNodeMeta(t *testing.T) { s := testStateStore(t) - // Listing with no results returns nil - idx, res, err := s.ServicesByNodeMeta(map[string]string{"somekey": "somevalue"}) + // Listing with no results returns nil. + ws := memdb.NewWatchSet() + idx, res, err := s.ServicesByNodeMeta(ws, map[string]string{"somekey": "somevalue"}) if idx != 0 || len(res) != 0 || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } - // Create some nodes and services in the state store + // Create some nodes and services in the state store. node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} if err := s.EnsureNode(0, node0); err != nil { t.Fatalf("err: %v", err) @@ -962,9 +1038,13 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if err := s.EnsureService(3, "node1", ns2); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } - // Filter the services by the first node's meta value - _, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"}) + // Filter the services by the first node's meta value. + ws = memdb.NewWatchSet() + _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client"}) if err != nil { t.Fatalf("err: %s", err) } @@ -977,7 +1057,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { } // Get all services using the common meta value - _, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"}) + _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -990,7 +1070,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { } // Get an empty list for an invalid meta value - _, res, err = s.ServicesByNodeMeta(map[string]string{"invalid": "nope"}) + _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"invalid": "nope"}) if err != nil { t.Fatalf("err: %s", err) } @@ -1000,7 +1080,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { } // Get the first node's service instance using multiple meta filters - _, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client", "common": "1"}) + _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client", "common": "1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -1011,45 +1091,94 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if !reflect.DeepEqual(res, expected) { t.Fatalf("bad: %v %v", res, expected) } + + // Sanity check the watch before we proceed. + if watchFired(ws) { + t.Fatalf("bad") + } + + // Registering some unrelated node + service should not fire the watch. + testRegisterNode(t, s, 4, "nope") + testRegisterService(t, s, 5, "nope", "nope") + if watchFired(ws) { + t.Fatalf("bad") + } + + // Overwhelm the service tracking. + idx = 6 + for i := 0; i < 2*watchLimit; i++ { + node := fmt.Sprintf("many%d", i) + testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) + idx++ + testRegisterService(t, s, idx, node, "nope") + idx++ + } + + // Now get a fresh watch, which will be forced to watch the whole + // service table. + ws = memdb.NewWatchSet() + _, _, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Registering some unrelated node + service should not fire the watch. + testRegisterService(t, s, idx, "nope", "more-nope") + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServiceNodes(t *testing.T) { s := testStateStore(t) + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, nodes, err := s.ServiceNodes(ws, "db") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(nodes) != 0 { + t.Fatalf("bad: %v", nodes) + } + + // Create some nodes and services. if err := s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(14, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(15, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil { t.Fatalf("err: %v", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } - idx, nodes, err := s.ServiceNodes("db") + // Read everything back. + ws = memdb.NewWatchSet() + idx, nodes, err = s.ServiceNodes(ws, "db") if err != nil { t.Fatalf("err: %s", err) } if idx != 16 { - t.Fatalf("bad: %v", 16) + t.Fatalf("bad: %d", idx) } if len(nodes) != 3 { t.Fatalf("bad: %v", nodes) @@ -1069,7 +1198,6 @@ func TestStateStore_ServiceNodes(t *testing.T) { if nodes[0].ServicePort != 8000 { t.Fatalf("bad: %v", nodes) } - if nodes[1].Node != "bar" { t.Fatalf("bad: %v", nodes) } @@ -1085,7 +1213,6 @@ func TestStateStore_ServiceNodes(t *testing.T) { if nodes[1].ServicePort != 8001 { t.Fatalf("bad: %v", nodes) } - if nodes[2].Node != "foo" { t.Fatalf("bad: %v", nodes) } @@ -1101,32 +1228,88 @@ func TestStateStore_ServiceNodes(t *testing.T) { if nodes[2].ServicePort != 8000 { t.Fatalf("bad: %v", nodes) } + + // Registering some unrelated node should not fire the watch. + testRegisterNode(t, s, 17, "nope") + if watchFired(ws) { + t.Fatalf("bad") + } + + // But removing a node with the "db" service should fire the watch. + if err := s.DeleteNode(18, "bar"); err != nil { + t.Fatalf("err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("bad") + } + + // Overwhelm the node tracking. + idx = 19 + for i := 0; i < 2*watchLimit; i++ { + node := fmt.Sprintf("many%d", i) + if err := s.EnsureNode(idx, &structs.Node{Node: node, Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s.EnsureService(idx, node, &structs.NodeService{ID: "db", Service: "db", Port: 8000}); err != nil { + t.Fatalf("err: %v", err) + } + idx++ + } + + // Now get a fresh watch, which will be forced to watch the whole nodes + // table. + ws = memdb.NewWatchSet() + _, _, err = s.ServiceNodes(ws, "db") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Registering some unrelated node should fire the watch now. + testRegisterNode(t, s, idx, "more-nope") + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServiceTagNodes(t *testing.T) { s := testStateStore(t) + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, nodes, err := s.ServiceTagNodes(ws, "db", "master") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if len(nodes) != 0 { + t.Fatalf("bad: %v", nodes) + } + + // Create some nodes and services. if err := s.EnsureNode(15, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureNode(16, &structs.Node{Node: "bar", Address: "127.0.0.2"}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(17, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"master"}, Address: "", Port: 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(18, "foo", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}); err != nil { t.Fatalf("err: %v", err) } - if err := s.EnsureService(19, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8000}); err != nil { t.Fatalf("err: %v", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } - idx, nodes, err := s.ServiceTagNodes("db", "master") + // Read everything back. + ws = memdb.NewWatchSet() + idx, nodes, err = s.ServiceTagNodes(ws, "db", "master") if err != nil { t.Fatalf("err: %s", err) } @@ -1148,6 +1331,20 @@ func TestStateStore_ServiceTagNodes(t *testing.T) { if nodes[0].ServicePort != 8000 { t.Fatalf("bad: %v", nodes) } + + // Registering some unrelated node should not fire the watch. + testRegisterNode(t, s, 20, "nope") + if watchFired(ws) { + t.Fatalf("bad") + } + + // But removing a node with the "db:master" service should fire the watch. + if err := s.DeleteNode(21, "foo"); err != nil { + t.Fatalf("err: %s", err) + } + if !watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { @@ -1173,7 +1370,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { t.Fatalf("err: %v", err) } - idx, nodes, err := s.ServiceTagNodes("db", "master") + idx, nodes, err := s.ServiceTagNodes(nil, "db", "master") if err != nil { t.Fatalf("err: %s", err) } @@ -1196,7 +1393,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { t.Fatalf("bad: %v", nodes) } - idx, nodes, err = s.ServiceTagNodes("db", "v2") + idx, nodes, err = s.ServiceTagNodes(nil, "db", "v2") if err != nil { t.Fatalf("err: %s", err) } @@ -1207,7 +1404,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { t.Fatalf("bad: %v", nodes) } - idx, nodes, err = s.ServiceTagNodes("db", "dev") + idx, nodes, err = s.ServiceTagNodes(nil, "db", "dev") if err != nil { t.Fatalf("err: %s", err) } @@ -1234,18 +1431,24 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) { func TestStateStore_DeleteService(t *testing.T) { s := testStateStore(t) - // Register a node with one service and a check + // Register a node with one service and a check. testRegisterNode(t, s, 1, "node1") testRegisterService(t, s, 2, "node1", "service1") testRegisterCheck(t, s, 3, "node1", "service1", "check1", structs.HealthPassing) - // Delete the service + // Delete the service. + ws := memdb.NewWatchSet() + _, _, err := s.NodeServices(ws, "node1") if err := s.DeleteService(4, "node1", "service1"); err != nil { t.Fatalf("err: %s", err) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Service doesn't exist. - _, ns, err := s.NodeServices("node1") + ws = memdb.NewWatchSet() + _, ns, err := s.NodeServices(ws, "node1") if err != nil || ns == nil || len(ns.Services) != 0 { t.Fatalf("bad: %#v (err: %#v)", ns, err) } @@ -1259,7 +1462,7 @@ func TestStateStore_DeleteService(t *testing.T) { t.Fatalf("bad: %#v (err: %s)", check, err) } - // Index tables were updated + // Index tables were updated. if idx := s.maxIndex("services"); idx != 4 { t.Fatalf("bad index: %d", idx) } @@ -1268,13 +1471,16 @@ func TestStateStore_DeleteService(t *testing.T) { } // Deleting a nonexistent service should be idempotent and not return an - // error + // error, nor fire a watch. if err := s.DeleteService(5, "node1", "service1"); err != nil { t.Fatalf("err: %s", err) } if idx := s.maxIndex("services"); idx != 4 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_Service_Snapshot(t *testing.T) { diff --git a/consul/state/state_store.go b/consul/state/state_store.go index dc72726564..0b8ce76154 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -30,6 +30,18 @@ var ( ErrMissingQueryID = errors.New("Missing Query ID") ) +const ( + // watchLimit is used as a soft limit to cap how many watches we allow + // for a given blocking query. If this is exceeded, then we will use a + // higher-level watch that's less fine-grained. This isn't as bad as it + // seems since we have made the main culprits (nodes and services) more + // efficient by diffing before we update via register requests. + // + // Given the current size of aFew == 32 in memdb's watch_few.go, this + // will allow for up to ~64 goroutines per blocking query. + watchLimit = 2048 +) + // StateStore is where we store all of Consul's state, including // records of node registrations, services, checks, key/value // pairs and more. The DB is entirely in-memory and is constructed diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 95f385cc7d..a28d4dd656 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -126,10 +126,11 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed -// 1 ms timeout since we already expect the event happened before calling -// this and just need to distinguish a fire from a timeout. +// timeout since we already expect the event happened before calling this and +// just need to distinguish a fire from a timeout. We do need a little time to +// allow the watch to set up any goroutines, though. func watchFired(ws memdb.WatchSet) bool { - timedOut := ws.Watch(time.After(1 * time.Millisecond)) + timedOut := ws.Watch(time.After(50 * time.Millisecond)) return !timedOut }