consul: Support blocking queries for the Catalog

This commit is contained in:
Armon Dadgar 2014-02-05 11:10:10 -08:00
parent 1d1dd8f8d2
commit 89bffaf467
3 changed files with 39 additions and 33 deletions

View File

@ -118,7 +118,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
} }
// ServiceNodes returns all the nodes registered as part of a service // ServiceNodes returns all the nodes registered as part of a service
func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.ServiceNodes) error { func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error {
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done {
return err return err
} }
@ -130,19 +130,20 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Get the nodes // Get the nodes
state := c.srv.fsm.State() state := c.srv.fsm.State()
var nodes structs.ServiceNodes return c.srv.blockingRPC(&args.BlockingQuery,
if args.TagFilter { state.QueryTables("ServiceNodes"),
_, nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) func() (uint64, error) {
} else { if args.TagFilter {
_, nodes = state.ServiceNodes(args.ServiceName) reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} } else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
*reply = nodes }
return nil return reply.Index, nil
})
} }
// NodeServices returns all the services registered as part of a node // NodeServices returns all the services registered as part of a node
func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.NodeServices) error { func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error {
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done {
return err return err
} }
@ -154,8 +155,10 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services // Get the node services
state := c.srv.fsm.State() state := c.srv.fsm.State()
_, services := state.NodeServices(args.Node) return c.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("NodeServices"),
*reply = *services func() (uint64, error) {
return nil reply.Index, reply.NodeServices = state.NodeServices(args.Node)
return reply.Index, nil
})
} }

View File

@ -407,7 +407,7 @@ func TestCatalogListServiceNodes(t *testing.T) {
ServiceTag: "slave", ServiceTag: "slave",
TagFilter: false, TagFilter: false,
} }
var out structs.ServiceNodes var out structs.IndexedServiceNodes
err := client.Call("Catalog.ServiceNodes", &args, &out) err := client.Call("Catalog.ServiceNodes", &args, &out)
if err == nil || err.Error() != "No cluster leader" { if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -424,18 +424,18 @@ func TestCatalogListServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if len(out) != 1 { if len(out.ServiceNodes) != 1 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
// Try with a filter // Try with a filter
args.TagFilter = true args.TagFilter = true
out = nil out = structs.IndexedServiceNodes{}
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if len(out) != 0 { if len(out.ServiceNodes) != 0 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }
@ -451,7 +451,7 @@ func TestCatalogNodeServices(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
} }
var out structs.NodeServices var out structs.IndexedNodeServices
err := client.Call("Catalog.NodeServices", &args, &out) err := client.Call("Catalog.NodeServices", &args, &out)
if err == nil || err.Error() != "No cluster leader" { if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -469,16 +469,17 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if out.Node.Address != "127.0.0.1" { if out.NodeServices.Node.Address != "127.0.0.1" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if len(out.Services) != 2 { if len(out.NodeServices.Services) != 2 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if out.Services["db"].Tag != "primary" || out.Services["db"].Port != 5000 { services := out.NodeServices.Services
if services["db"].Tag != "primary" || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if out.Services["web"].Tag != "" || out.Services["web"].Port != 80 { if services["web"].Tag != "" || services["web"].Port != 80 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }
@ -520,13 +521,13 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",
} }
var nodes structs.ServiceNodes var out2 structs.IndexedServiceNodes
if err := client.Call("Catalog.ServiceNodes", query, &nodes); err != nil { if err := client.Call("Catalog.ServiceNodes", query, &out2); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Check the output // Check the output
if len(nodes) != 1 { if len(out2.ServiceNodes) != 1 {
t.Fatalf("Bad: %v", nodes) t.Fatalf("Bad: %v", out2)
} }
} }

View File

@ -195,8 +195,10 @@ func (s *StateStore) initialize() error {
// Setup the query tables // Setup the query tables
// TODO: Other queries... // TODO: Other queries...
s.queryTables = map[string]MDBTables{ s.queryTables = map[string]MDBTables{
"Nodes": MDBTables{s.nodeTable}, "Nodes": MDBTables{s.nodeTable},
"Services": MDBTables{s.serviceTable}, "Services": MDBTables{s.serviceTable},
"ServiceNodes": MDBTables{s.nodeTable, s.serviceTable},
"NodeServices": MDBTables{s.nodeTable, s.serviceTable},
} }
return nil return nil
} }
@ -298,7 +300,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe
// NodeServices is used to return all the services of a given node // NodeServices is used to return all the services of a given node
func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) { func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) {
tables := MDBTables{s.nodeTable, s.serviceTable} tables := s.queryTables["NodeServices"]
tx, err := tables.StartTxn(true) tx, err := tables.StartTxn(true)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err)) panic(fmt.Errorf("Failed to start txn: %v", err))
@ -440,7 +442,7 @@ func (s *StateStore) Services() (uint64, map[string][]string) {
// ServiceNodes returns the nodes associated with a given service // ServiceNodes returns the nodes associated with a given service
func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) { func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) {
tables := MDBTables{s.nodeTable, s.serviceTable} tables := s.queryTables["ServiceNodes"]
tx, err := tables.StartTxn(true) tx, err := tables.StartTxn(true)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err)) panic(fmt.Errorf("Failed to start txn: %v", err))
@ -458,7 +460,7 @@ func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes)
// ServiceTagNodes returns the nodes associated with a given service matching a tag // ServiceTagNodes returns the nodes associated with a given service matching a tag
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) { func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) {
tables := MDBTables{s.nodeTable, s.serviceTable} tables := s.queryTables["ServiceNodes"]
tx, err := tables.StartTxn(true) tx, err := tables.StartTxn(true)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err)) panic(fmt.Errorf("Failed to start txn: %v", err))