From 0b2cd77bbef901d7aef93700fcc6aadae522f207 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 12 Dec 2013 11:37:19 -0800 Subject: [PATCH] Support querying for ServiceNodes --- consul/catalog_endpoint.go | 19 +++++++ consul/catalog_endpoint_test.go | 45 ++++++++++++++++ consul/state_store.go | 35 +++++++++++- consul/state_store_test.go | 95 +++++++++++++++++++++++++++++++++ rpc/structs.go | 17 ++++++ 5 files changed, 210 insertions(+), 1 deletion(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 5cf2abb0a7..16e9699e50 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -97,3 +97,22 @@ func (c *Catalog) ListServices(dc string, reply *rpc.Services) error { *reply = services return nil } + +// ServiceNodes returns all the nodes registered as part of a service +func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.ServiceNodes) error { + if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done { + return err + } + + // Get the nodes + state := c.srv.fsm.State() + var nodes rpc.ServiceNodes + if args.TagFilter { + nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag) + } else { + nodes = state.ServiceNodes(args.ServiceName) + } + + *reply = nodes + return nil +} diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index e2aa530bfc..8b929faae0 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -252,3 +252,48 @@ func TestCatalogListServices(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func TestCatalogListServiceNodes(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := rpc.ServiceNodesRequest{ + Datacenter: "dc1", + ServiceName: "db", + ServiceTag: "slave", + TagFilter: false, + } + var out rpc.ServiceNodes + err := client.Call("Catalog.ServiceNodes", &args, &out) + if err == nil || err.Error() != "No cluster leader" { + t.Fatalf("err: %v", err) + } + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + // Just add a node + s1.fsm.State().EnsureNode("foo", "127.0.0.1") + s1.fsm.State().EnsureService("foo", "db", "primary", 5000) + + if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out) != 1 { + t.Fatalf("bad: %v", out) + } + + // Try with a filter + args.TagFilter = true + + if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out) != 0 { + t.Fatalf("bad: %v", out) + } +} diff --git a/consul/state_store.go b/consul/state_store.go index ab9afccb47..77da838117 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -3,6 +3,7 @@ package consul import ( "database/sql" "fmt" + "github.com/hashicorp/consul/rpc" _ "github.com/mattn/go-sqlite3" ) @@ -17,6 +18,8 @@ const ( queryDeleteNodeService queryDeleteNode queryServices + queryServiceNodes + queryServiceTagNodes ) // NoodeServices maps the Service name to a tag and port @@ -82,7 +85,7 @@ func (s *StateStore) initialize() error { tables := []string{ `CREATE TABLE nodes (name text unique, address text);`, `CREATE TABLE services (node text REFERENCES nodes(name) ON DELETE CASCADE, service text, tag text, port integer);`, - `CREATE INDEX servName ON services(service);`, + `CREATE INDEX servName ON services(service, tag);`, `CREATE INDEX nodeName ON services(node);`, } for _, t := range tables { @@ -101,6 +104,8 @@ func (s *StateStore) initialize() error { queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?", queryDeleteNode: "DELETE FROM nodes WHERE name=?", queryServices: "SELECT DISTINCT service, tag FROM services", + queryServiceNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.node=n.name", + queryServiceTagNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.tag=? AND s.node=n.name", } for name, query := range queries { stmt, err := s.db.Prepare(query) @@ -240,3 +245,31 @@ func (s *StateStore) Services() map[string][]string { return services } + +// ServiceNodes returns the nodes associated with a given service +func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes { + stmt := s.prepared[queryServiceNodes] + return parseServiceNodes(stmt.Query(service)) +} + +// ServiceTagNodes returns the nodes associated with a given service matching a tag +func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes { + stmt := s.prepared[queryServiceTagNodes] + return parseServiceNodes(stmt.Query(service, tag)) +} + +// parseServiceNodes parses results from the queryServiceNodes / queryServiceTagNodes query +func parseServiceNodes(rows *sql.Rows, err error) rpc.ServiceNodes { + if err != nil { + panic(fmt.Errorf("Failed to get service nodes: %v", err)) + } + var nodes rpc.ServiceNodes + var node rpc.ServiceNode + for rows.Next() { + if err := rows.Scan(&node.Node, &node.Address, &node.ServiceTag, &node.ServicePort); err != nil { + panic(fmt.Errorf("Failed to get services: %v", err)) + } + nodes = append(nodes, node) + } + return nodes +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 62f616891b..4352751f13 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -200,3 +200,98 @@ func TestGetServices(t *testing.T) { t.Fatalf("Bad entry: %#v", tags) } } + +func TestServiceNodes(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v") + } + + nodes := store.ServiceNodes("db") + if len(nodes) != 2 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServiceTag != "master" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } + + if nodes[1].Node != "bar" { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].Address != "127.0.0.2" { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].ServiceTag != "slave" { + t.Fatalf("bad: %v", nodes) + } + if nodes[1].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } +} + +func TestServiceTagNodes(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v") + } + + nodes := store.ServiceTagNodes("db", "master") + if len(nodes) != 1 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Address != "127.0.0.1" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServiceTag != "master" { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].ServicePort != 8000 { + t.Fatalf("bad: %v", nodes) + } +} diff --git a/rpc/structs.go b/rpc/structs.go index 000c9e83d3..3c82e159c1 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -50,6 +50,23 @@ type Nodes []Node // Maps service name to available tags type Services map[string][]string +// ServiceNodesRequest is used to query the nodes of a service +type ServiceNodesRequest struct { + Datacenter string + ServiceName string + ServiceTag string + TagFilter bool // Controls tag filtering +} + +// ServiceNode represents a node that is part of a service +type ServiceNode struct { + Node string + Address string + ServiceTag string + ServicePort int +} +type ServiceNodes []ServiceNode + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle