Adding NodeServices endpoing

This commit is contained in:
Armon Dadgar 2013-12-12 11:46:25 -08:00
parent 0b2cd77bbe
commit d17b32d165
4 changed files with 70 additions and 10 deletions

View File

@ -116,3 +116,17 @@ func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.Service
*reply = nodes *reply = nodes
return nil return nil
} }
// NodeServices returns all the services registered as part of a node
func (c *Catalog) NodeServices(args *rpc.NodeServicesRequest, reply *rpc.NodeServices) error {
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done {
return err
}
// Get the node services
state := c.srv.fsm.State()
services := state.NodeServices(args.Node)
*reply = services
return nil
}

View File

@ -297,3 +297,43 @@ func TestCatalogListServiceNodes(t *testing.T) {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }
func TestCatalogNodeServices(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
args := rpc.NodeServicesRequest{
Datacenter: "dc1",
Node: "foo",
}
var out rpc.NodeServices
err := client.Call("Catalog.NodeServices", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
// Wait for leader
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode("foo", "127.0.0.1")
s1.fsm.State().EnsureService("foo", "db", "primary", 5000)
s1.fsm.State().EnsureService("foo", "web", "", 80)
if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != 2 {
t.Fatalf("bad: %v", out)
}
if out["db"].Tag != "primary" || out["db"].Port != 5000 {
t.Fatalf("bad: %v", out)
}
if out["web"].Tag != "" || out["web"].Port != 80 {
t.Fatalf("bad: %v", out)
}
}

View File

@ -22,13 +22,6 @@ const (
queryServiceTagNodes queryServiceTagNodes
) )
// NoodeServices maps the Service name to a tag and port
type ServiceEntry struct {
Tag string
Port int
}
type NodeServices map[string]ServiceEntry
// The StateStore is responsible for maintaining all the Consul // The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency // state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide // through the use of Raft. The goals of the StateStore are to provide
@ -191,16 +184,16 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
} }
// NodeServices is used to return all the services of a given node // NodeServices is used to return all the services of a given node
func (s *StateStore) NodeServices(name string) NodeServices { func (s *StateStore) NodeServices(name string) rpc.NodeServices {
stmt := s.prepared[queryNodeServices] stmt := s.prepared[queryNodeServices]
rows, err := stmt.Query(name) rows, err := stmt.Query(name)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err)) panic(fmt.Errorf("Failed to get node services: %v", err))
} }
services := NodeServices(make(map[string]ServiceEntry)) services := rpc.NodeServices(make(map[string]rpc.NodeService))
var service string var service string
var entry ServiceEntry var entry rpc.NodeService
for rows.Next() { for rows.Next() {
if err := rows.Scan(&service, &entry.Tag, &entry.Port); err != nil { if err := rows.Scan(&service, &entry.Tag, &entry.Port); err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err)) panic(fmt.Errorf("Failed to get node services: %v", err))

View File

@ -67,6 +67,19 @@ type ServiceNode struct {
} }
type ServiceNodes []ServiceNode type ServiceNodes []ServiceNode
// NodeServiceRequest is used to request the services of a node
type NodeServicesRequest struct {
Datacenter string
Node string
}
// NodeService is a service provided by a node
type NodeService struct {
Tag string
Port int
}
type NodeServices map[string]NodeService
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error { func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle var handle codec.MsgpackHandle