From 47cd33ae46860c8420999743f172fa749c7a721e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 12 Dec 2013 11:07:14 -0800 Subject: [PATCH] Adding ability to list services --- consul/catalog_endpoint.go | 14 ++++++++++ consul/catalog_endpoint_test.go | 35 ++++++++++++++++++++++++ consul/state_store.go | 27 +++++++++++++++++++ consul/state_store_test.go | 48 +++++++++++++++++++++++++++++++++ consul/util.go | 11 ++++++++ consul/util_test.go | 15 +++++++++++ rpc/structs.go | 4 +++ 7 files changed, 154 insertions(+) create mode 100644 consul/util.go create mode 100644 consul/util_test.go diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 75ff84ebbb..5cf2abb0a7 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -83,3 +83,17 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error { *reply = nodes return nil } + +// ListServices is used to query the services in a DC +func (c *Catalog) ListServices(dc string, reply *rpc.Services) error { + if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done { + return err + } + + // Get the current nodes + state := c.srv.fsm.State() + services := state.Services() + + *reply = services + return nil +} diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index b6e07fd2be..e2aa530bfc 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -217,3 +217,38 @@ func TestCatalogListNodes(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func TestCatalogListServices(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + var out rpc.Services + err := client.Call("Catalog.ListServices", "dc1", &out) + if err == nil || err.Error() != "No cluster leader" { + t.Fatalf("err: %v", err) + } + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + // Just add a node + s1.fsm.State().EnsureNode("foo", "127.0.0.1") + s1.fsm.State().EnsureService("foo", "db", "primary", 5000) + + if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out) != 1 { + t.Fatalf("bad: %v", out) + } + if len(out["db"]) != 1 { + t.Fatalf("bad: %v", out) + } + if out["db"][0] != "primary" { + t.Fatalf("bad: %v", out) + } +} diff --git a/consul/state_store.go b/consul/state_store.go index 761fd476e8..dbd95f977f 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -16,6 +16,7 @@ const ( queryNodeServices queryDeleteNodeService queryDeleteNode + queryServices ) // NoodeServices maps the Service name to a tag and port @@ -99,6 +100,7 @@ func (s *StateStore) initialize() error { queryNodeServices: "SELECT service, tag, port from services where node=?", queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?", queryDeleteNode: "DELETE FROM nodes WHERE name=?", + queryServices: "SELECT DISTINCT service, tag FROM services", } for name, query := range queries { stmt, err := s.db.Prepare(query) @@ -215,3 +217,28 @@ func (s *StateStore) DeleteNode(node string) error { stmt := s.prepared[queryDeleteNode] return s.checkDelete(stmt.Exec(node)) } + +// Services is used to return all the services with a list of associated tags +func (s *StateStore) Services() map[string][]string { + stmt := s.prepared[queryServices] + rows, err := stmt.Query() + if err != nil { + panic(fmt.Errorf("Failed to get services: %v", err)) + } + + services := make(map[string][]string) + var service, tag string + for rows.Next() { + if err := rows.Scan(&service, &tag); err != nil { + panic(fmt.Errorf("Failed to get services: %v", err)) + } + + tags := services[service] + if !strContains(tags, tag) { + tags = append(tags, tag) + } + services[service] = tags + } + + return services +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index cc3a8c915f..62f616891b 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1,6 +1,7 @@ package consul import ( + "sort" "testing" ) @@ -152,3 +153,50 @@ func TestDeleteNode(t *testing.T) { t.Fatalf("found node") } } + +func TestGetServices(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "api", "", 5000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v") + } + + services := store.Services() + + tags, ok := services["api"] + if !ok { + t.Fatalf("missing api: %#v", services) + } + if len(tags) != 1 || tags[0] != "" { + t.Fatalf("Bad entry: %#v", tags) + } + + tags, ok = services["db"] + sort.Strings(tags) + if !ok { + t.Fatalf("missing db: %#v", services) + } + if len(tags) != 2 || tags[0] != "master" || tags[1] != "slave" { + t.Fatalf("Bad entry: %#v", tags) + } +} diff --git a/consul/util.go b/consul/util.go new file mode 100644 index 0000000000..6bc1940618 --- /dev/null +++ b/consul/util.go @@ -0,0 +1,11 @@ +package consul + +// strContains checks if a list contains a string +func strContains(l []string, s string) bool { + for _, v := range l { + if v == s { + return true + } + } + return false +} diff --git a/consul/util_test.go b/consul/util_test.go new file mode 100644 index 0000000000..2511ef2116 --- /dev/null +++ b/consul/util_test.go @@ -0,0 +1,15 @@ +package consul + +import ( + "testing" +) + +func TestStrContains(t *testing.T) { + l := []string{"a", "b", "c"} + if !strContains(l, "b") { + t.Fatalf("should contain") + } + if strContains(l, "d") { + t.Fatalf("should not contain") + } +} diff --git a/rpc/structs.go b/rpc/structs.go index 487e254446..000c9e83d3 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -46,6 +46,10 @@ type Node struct { } type Nodes []Node +// Used to return information about a provided services. +// Maps service name to available tags +type Services map[string][]string + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle