Support querying for ServiceNodes

This commit is contained in:
Armon Dadgar 2013-12-12 11:37:19 -08:00
parent 5b40ba49bf
commit 0b2cd77bbe
5 changed files with 210 additions and 1 deletions

View File

@ -97,3 +97,22 @@ func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
*reply = services *reply = services
return nil return nil
} }
// ServiceNodes returns all the nodes registered as part of a service
func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.ServiceNodes) error {
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done {
return err
}
// Get the nodes
state := c.srv.fsm.State()
var nodes rpc.ServiceNodes
if args.TagFilter {
nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
nodes = state.ServiceNodes(args.ServiceName)
}
*reply = nodes
return nil
}

View File

@ -252,3 +252,48 @@ func TestCatalogListServices(t *testing.T) {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }
func TestCatalogListServiceNodes(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
args := rpc.ServiceNodesRequest{
Datacenter: "dc1",
ServiceName: "db",
ServiceTag: "slave",
TagFilter: false,
}
var out rpc.ServiceNodes
err := client.Call("Catalog.ServiceNodes", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
// Wait for leader
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode("foo", "127.0.0.1")
s1.fsm.State().EnsureService("foo", "db", "primary", 5000)
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != 1 {
t.Fatalf("bad: %v", out)
}
// Try with a filter
args.TagFilter = true
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != 0 {
t.Fatalf("bad: %v", out)
}
}

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/hashicorp/consul/rpc"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
@ -17,6 +18,8 @@ const (
queryDeleteNodeService queryDeleteNodeService
queryDeleteNode queryDeleteNode
queryServices queryServices
queryServiceNodes
queryServiceTagNodes
) )
// NoodeServices maps the Service name to a tag and port // NoodeServices maps the Service name to a tag and port
@ -82,7 +85,7 @@ func (s *StateStore) initialize() error {
tables := []string{ tables := []string{
`CREATE TABLE nodes (name text unique, address text);`, `CREATE TABLE nodes (name text unique, address text);`,
`CREATE TABLE services (node text REFERENCES nodes(name) ON DELETE CASCADE, service text, tag text, port integer);`, `CREATE TABLE services (node text REFERENCES nodes(name) ON DELETE CASCADE, service text, tag text, port integer);`,
`CREATE INDEX servName ON services(service);`, `CREATE INDEX servName ON services(service, tag);`,
`CREATE INDEX nodeName ON services(node);`, `CREATE INDEX nodeName ON services(node);`,
} }
for _, t := range tables { for _, t := range tables {
@ -101,6 +104,8 @@ func (s *StateStore) initialize() error {
queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?", queryDeleteNodeService: "DELETE FROM services WHERE node=? AND service=?",
queryDeleteNode: "DELETE FROM nodes WHERE name=?", queryDeleteNode: "DELETE FROM nodes WHERE name=?",
queryServices: "SELECT DISTINCT service, tag FROM services", queryServices: "SELECT DISTINCT service, tag FROM services",
queryServiceNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.node=n.name",
queryServiceTagNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.tag=? AND s.node=n.name",
} }
for name, query := range queries { for name, query := range queries {
stmt, err := s.db.Prepare(query) stmt, err := s.db.Prepare(query)
@ -240,3 +245,31 @@ func (s *StateStore) Services() map[string][]string {
return services return services
} }
// ServiceNodes returns the nodes associated with a given service
func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
stmt := s.prepared[queryServiceNodes]
return parseServiceNodes(stmt.Query(service))
}
// ServiceTagNodes returns the nodes associated with a given service matching a tag
func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
stmt := s.prepared[queryServiceTagNodes]
return parseServiceNodes(stmt.Query(service, tag))
}
// parseServiceNodes parses results from the queryServiceNodes / queryServiceTagNodes query
func parseServiceNodes(rows *sql.Rows, err error) rpc.ServiceNodes {
if err != nil {
panic(fmt.Errorf("Failed to get service nodes: %v", err))
}
var nodes rpc.ServiceNodes
var node rpc.ServiceNode
for rows.Next() {
if err := rows.Scan(&node.Node, &node.Address, &node.ServiceTag, &node.ServicePort); err != nil {
panic(fmt.Errorf("Failed to get services: %v", err))
}
nodes = append(nodes, node)
}
return nodes
}

View File

@ -200,3 +200,98 @@ func TestGetServices(t *testing.T) {
t.Fatalf("Bad entry: %#v", tags) t.Fatalf("Bad entry: %#v", tags)
} }
} }
func TestServiceNodes(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "master", 8000); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v")
}
nodes := store.ServiceNodes("db")
if len(nodes) != 2 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServiceTag != "master" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].Node != "bar" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].Address != "127.0.0.2" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].ServiceTag != "slave" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
}
func TestServiceTagNodes(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "master", 8000); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v")
}
nodes := store.ServiceTagNodes("db", "master")
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServiceTag != "master" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes)
}
}

View File

@ -50,6 +50,23 @@ type Nodes []Node
// Maps service name to available tags // Maps service name to available tags
type Services map[string][]string type Services map[string][]string
// ServiceNodesRequest is used to query the nodes of a service
type ServiceNodesRequest struct {
Datacenter string
ServiceName string
ServiceTag string
TagFilter bool // Controls tag filtering
}
// ServiceNode represents a node that is part of a service
type ServiceNode struct {
Node string
Address string
ServiceTag string
ServicePort int
}
type ServiceNodes []ServiceNode
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error { func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle var handle codec.MsgpackHandle