mirror of https://github.com/status-im/consul.git
Adding ability to list services
This commit is contained in:
parent
7915a23531
commit
47cd33ae46
|
@ -83,3 +83,17 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
|
|||
*reply = nodes
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListServices is used to query the services in a DC
|
||||
func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
|
||||
if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the current nodes
|
||||
state := c.srv.fsm.State()
|
||||
services := state.Services()
|
||||
|
||||
*reply = services
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -217,3 +217,38 @@ func TestCatalogListNodes(t *testing.T) {
|
|||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogListServices(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
var out rpc.Services
|
||||
err := client.Call("Catalog.ListServices", "dc1", &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode("foo", "127.0.0.1")
|
||||
s1.fsm.State().EnsureService("foo", "db", "primary", 5000)
|
||||
|
||||
if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(out) != 1 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if len(out["db"]) != 1 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out["db"][0] != "primary" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ const (
|
|||
queryNodeServices
|
||||
queryDeleteNodeService
|
||||
queryDeleteNode
|
||||
queryServices
|
||||
)
|
||||
|
||||
// NoodeServices maps the Service name to a tag and port
|
||||
|
@ -99,6 +100,7 @@ func (s *StateStore) initialize() error {
|
|||
queryNodeServices: "SELECT service, tag, port from services where node=?",
|
||||
queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?",
|
||||
queryDeleteNode: "DELETE FROM nodes WHERE name=?",
|
||||
queryServices: "SELECT DISTINCT service, tag FROM services",
|
||||
}
|
||||
for name, query := range queries {
|
||||
stmt, err := s.db.Prepare(query)
|
||||
|
@ -215,3 +217,28 @@ func (s *StateStore) DeleteNode(node string) error {
|
|||
stmt := s.prepared[queryDeleteNode]
|
||||
return s.checkDelete(stmt.Exec(node))
|
||||
}
|
||||
|
||||
// Services is used to return all the services with a list of associated tags
|
||||
func (s *StateStore) Services() map[string][]string {
|
||||
stmt := s.prepared[queryServices]
|
||||
rows, err := stmt.Query()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Failed to get services: %v", err))
|
||||
}
|
||||
|
||||
services := make(map[string][]string)
|
||||
var service, tag string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&service, &tag); err != nil {
|
||||
panic(fmt.Errorf("Failed to get services: %v", err))
|
||||
}
|
||||
|
||||
tags := services[service]
|
||||
if !strContains(tags, tag) {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
services[service] = tags
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -152,3 +153,50 @@ func TestDeleteNode(t *testing.T) {
|
|||
t.Fatalf("found node")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetServices(t *testing.T) {
|
||||
store, err := NewStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureService("foo", "api", "", 5000); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureService("foo", "db", "master", 8000); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
services := store.Services()
|
||||
|
||||
tags, ok := services["api"]
|
||||
if !ok {
|
||||
t.Fatalf("missing api: %#v", services)
|
||||
}
|
||||
if len(tags) != 1 || tags[0] != "" {
|
||||
t.Fatalf("Bad entry: %#v", tags)
|
||||
}
|
||||
|
||||
tags, ok = services["db"]
|
||||
sort.Strings(tags)
|
||||
if !ok {
|
||||
t.Fatalf("missing db: %#v", services)
|
||||
}
|
||||
if len(tags) != 2 || tags[0] != "master" || tags[1] != "slave" {
|
||||
t.Fatalf("Bad entry: %#v", tags)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package consul
|
||||
|
||||
// strContains checks if a list contains a string
|
||||
func strContains(l []string, s string) bool {
|
||||
for _, v := range l {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStrContains(t *testing.T) {
|
||||
l := []string{"a", "b", "c"}
|
||||
if !strContains(l, "b") {
|
||||
t.Fatalf("should contain")
|
||||
}
|
||||
if strContains(l, "d") {
|
||||
t.Fatalf("should not contain")
|
||||
}
|
||||
}
|
|
@ -46,6 +46,10 @@ type Node struct {
|
|||
}
|
||||
type Nodes []Node
|
||||
|
||||
// Used to return information about a provided services.
|
||||
// Maps service name to available tags
|
||||
type Services map[string][]string
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func Decode(buf []byte, out interface{}) error {
|
||||
var handle codec.MsgpackHandle
|
||||
|
|
Loading…
Reference in New Issue