diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index b885653593..e2075dbdd9 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -118,7 +118,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I } // ServiceNodes returns all the nodes registered as part of a service -func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.ServiceNodes) error { +func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { return err } @@ -130,19 +130,20 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Get the nodes state := c.srv.fsm.State() - var nodes structs.ServiceNodes - if args.TagFilter { - _, nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) - } else { - _, nodes = state.ServiceNodes(args.ServiceName) - } - - *reply = nodes - return nil + return c.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("ServiceNodes"), + func() (uint64, error) { + if args.TagFilter { + reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) + } else { + reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName) + } + return reply.Index, nil + }) } // NodeServices returns all the services registered as part of a node -func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.NodeServices) error { +func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { return err } @@ -154,8 +155,10 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs // Get the node services state := c.srv.fsm.State() - _, services := state.NodeServices(args.Node) - - *reply = *services - return nil + return c.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("NodeServices"), + func() (uint64, error) { + reply.Index, reply.NodeServices = state.NodeServices(args.Node) + return reply.Index, nil + }) } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 0141576c05..9ebd079d1f 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -407,7 +407,7 @@ func TestCatalogListServiceNodes(t *testing.T) { ServiceTag: "slave", TagFilter: false, } - var out structs.ServiceNodes + var out structs.IndexedServiceNodes err := client.Call("Catalog.ServiceNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) @@ -424,18 +424,18 @@ func TestCatalogListServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - if len(out) != 1 { + if len(out.ServiceNodes) != 1 { t.Fatalf("bad: %v", out) } // Try with a filter args.TagFilter = true - out = nil + out = structs.IndexedServiceNodes{} if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } - if len(out) != 0 { + if len(out.ServiceNodes) != 0 { t.Fatalf("bad: %v", out) } } @@ -451,7 +451,7 @@ func TestCatalogNodeServices(t *testing.T) { Datacenter: "dc1", Node: "foo", } - var out structs.NodeServices + var out structs.IndexedNodeServices err := client.Call("Catalog.NodeServices", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) @@ -469,16 +469,17 @@ func TestCatalogNodeServices(t *testing.T) { t.Fatalf("err: %v", err) } - if out.Node.Address != "127.0.0.1" { + if out.NodeServices.Node.Address != "127.0.0.1" { t.Fatalf("bad: %v", out) } - if len(out.Services) != 2 { + if len(out.NodeServices.Services) != 2 { t.Fatalf("bad: %v", out) } - if out.Services["db"].Tag != "primary" || out.Services["db"].Port != 5000 { + services := out.NodeServices.Services + if services["db"].Tag != "primary" || services["db"].Port != 5000 { t.Fatalf("bad: %v", out) } - if out.Services["web"].Tag != "" || out.Services["web"].Port != 80 { + if services["web"].Tag != "" || services["web"].Port != 80 { t.Fatalf("bad: %v", out) } } @@ -520,13 +521,13 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { Datacenter: "dc1", ServiceName: "web", } - var nodes structs.ServiceNodes - if err := client.Call("Catalog.ServiceNodes", query, &nodes); err != nil { + var out2 structs.IndexedServiceNodes + if err := client.Call("Catalog.ServiceNodes", query, &out2); err != nil { t.Fatalf("err: %v", err) } // Check the output - if len(nodes) != 1 { - t.Fatalf("Bad: %v", nodes) + if len(out2.ServiceNodes) != 1 { + t.Fatalf("Bad: %v", out2) } } diff --git a/consul/state_store.go b/consul/state_store.go index ad7ff3d06e..275975bed9 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -195,8 +195,10 @@ func (s *StateStore) initialize() error { // Setup the query tables // TODO: Other queries... s.queryTables = map[string]MDBTables{ - "Nodes": MDBTables{s.nodeTable}, - "Services": MDBTables{s.serviceTable}, + "Nodes": MDBTables{s.nodeTable}, + "Services": MDBTables{s.serviceTable}, + "ServiceNodes": MDBTables{s.nodeTable, s.serviceTable}, + "NodeServices": MDBTables{s.nodeTable, s.serviceTable}, } return nil } @@ -298,7 +300,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe // NodeServices is used to return all the services of a given node func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) { - tables := MDBTables{s.nodeTable, s.serviceTable} + tables := s.queryTables["NodeServices"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) @@ -440,7 +442,7 @@ func (s *StateStore) Services() (uint64, map[string][]string) { // ServiceNodes returns the nodes associated with a given service func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) { - tables := MDBTables{s.nodeTable, s.serviceTable} + tables := s.queryTables["ServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) @@ -458,7 +460,7 @@ func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) // ServiceTagNodes returns the nodes associated with a given service matching a tag func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) { - tables := MDBTables{s.nodeTable, s.serviceTable} + tables := s.queryTables["ServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err))