agent: RPC changes and blocking query support

This commit is contained in:
Armon Dadgar 2014-02-05 14:36:13 -08:00
parent 1996deaa18
commit c58c53f448
8 changed files with 196 additions and 136 deletions

View File

@ -57,51 +57,43 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
return out, nil
}
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
dc := s.agent.config.Datacenter
// Check for other DC
if other := req.URL.Query().Get("dc"); other != "" {
dc = other
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Setup the request
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
var out structs.Nodes
if err := s.agent.RPC("Catalog.ListNodes", dc, &out); err != nil {
return nil, err
var out structs.IndexedNodes
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
return 0, nil, err
}
return out, nil
return out.Index, out.Nodes, nil
}
func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
dc := s.agent.config.Datacenter
// Check for other DC
if other := req.URL.Query().Get("dc"); other != "" {
dc = other
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
var out structs.Services
if err := s.agent.RPC("Catalog.ListServices", dc, &out); err != nil {
return nil, err
var out structs.IndexedServices
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
return 0, nil, err
}
return out, nil
return out.Index, out.Services, nil
}
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Check for a tag
params := req.URL.Query()
if _, ok := params["tag"]; ok {
args.ServiceTag = params.Get("tag")
args.TagFilter = true
@ -112,27 +104,22 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
var out structs.ServiceNodes
var out structs.IndexedServiceNodes
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
return nil, err
return 0, nil, err
}
return out, nil
return out.Index, out.ServiceNodes, nil
}
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default Datacenter
args := structs.NodeSpecificRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Pull out the node name
@ -140,13 +127,13 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
out := new(structs.NodeServices)
if err := s.agent.RPC("Catalog.NodeServices", &args, out); err != nil {
return nil, err
var out structs.IndexedNodeServices
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
return 0, nil, err
}
return out, nil
return out.Index, out.NodeServices, nil
}

View File

@ -114,11 +114,15 @@ func TestCatalogNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.CatalogNodes(nil, req)
idx, obj, err := srv.CatalogNodes(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
nodes := obj.(structs.Nodes)
if len(nodes) != 2 {
t.Fatalf("bad: %v", obj)
@ -153,11 +157,15 @@ func TestCatalogServices(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.CatalogServices(nil, req)
idx, obj, err := srv.CatalogServices(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
services := obj.(structs.Services)
if len(services) != 2 {
t.Fatalf("bad: %v", obj)
@ -193,11 +201,15 @@ func TestCatalogServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.CatalogServiceNodes(nil, req)
idx, obj, err := srv.CatalogServiceNodes(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
nodes := obj.(structs.ServiceNodes)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
@ -233,11 +245,15 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.CatalogNodeServices(nil, req)
idx, obj, err := srv.CatalogNodeServices(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
services := obj.(*structs.NodeServices)
if len(services.Services) != 1 {
t.Fatalf("bad: %v", obj)

View File

@ -257,7 +257,7 @@ func (d *DNSServer) nodeLookup(datacenter, node string, req, resp *dns.Msg) {
Datacenter: datacenter,
Node: node,
}
var out structs.NodeServices
var out structs.IndexedNodeServices
if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
d.logger.Printf("[ERR] dns: rpc error: %v", err)
resp.SetRcode(req, dns.RcodeServerFailure)
@ -265,15 +265,15 @@ func (d *DNSServer) nodeLookup(datacenter, node string, req, resp *dns.Msg) {
}
// If we have no address, return not found!
if out.Node.Address == "" {
if out.NodeServices.Node.Address == "" {
resp.SetRcode(req, dns.RcodeNameError)
return
}
// Parse the IP
ip := net.ParseIP(out.Node.Address)
ip := net.ParseIP(out.NodeServices.Node.Address)
if ip == nil {
d.logger.Printf("[ERR] dns: failed to parse IP %v", out.Node)
d.logger.Printf("[ERR] dns: failed to parse IP %v", out.NodeServices.Node)
resp.SetRcode(req, dns.RcodeServerFailure)
return
}
@ -302,7 +302,7 @@ func (d *DNSServer) serviceLookup(datacenter, service, tag string, req, resp *dn
ServiceTag: tag,
TagFilter: tag != "",
}
var out structs.CheckServiceNodes
var out structs.IndexedCheckServiceNodes
if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
d.logger.Printf("[ERR] dns: rpc error: %v", err)
resp.SetRcode(req, dns.RcodeServerFailure)
@ -310,21 +310,21 @@ func (d *DNSServer) serviceLookup(datacenter, service, tag string, req, resp *dn
}
// If we have no nodes, return not found!
if len(out) == 0 {
if len(out.Nodes) == 0 {
resp.SetRcode(req, dns.RcodeNameError)
return
}
// Filter out any service nodes due to health checks
out = d.filterServiceNodes(out)
out.Nodes = d.filterServiceNodes(out.Nodes)
// Add various responses depending on the request
qType := req.Question[0].Qtype
if qType == dns.TypeANY || qType == dns.TypeA {
d.serviceARecords(out, req, resp)
d.serviceARecords(out.Nodes, req, resp)
}
if qType == dns.TypeANY || qType == dns.TypeSRV {
d.serviceSRVRecords(datacenter, out, req, resp)
d.serviceSRVRecords(datacenter, out.Nodes, req, resp)
}
}

View File

@ -6,16 +6,11 @@ import (
"strings"
)
func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
args := structs.ChecksInStateRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.ChecksInStateRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Pull out the service name
@ -23,27 +18,22 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
if args.State == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing check state"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
var out structs.HealthChecks
var out structs.IndexedHealthChecks
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
return nil, err
return 0, nil, err
}
return out, nil
return out.Index, out.HealthChecks, nil
}
func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
args := structs.NodeSpecificRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Pull out the service name
@ -51,27 +41,22 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
var out structs.HealthChecks
var out structs.IndexedHealthChecks
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
return nil, err
return 0, nil, err
}
return out, nil
return out.Index, out.HealthChecks, nil
}
func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Pull out the service name
@ -79,30 +64,26 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
var out structs.HealthChecks
var out structs.IndexedHealthChecks
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
return nil, err
return 0, nil, err
}
return out, nil
return out.Index, out.HealthChecks, nil
}
func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error) {
// Set default DC
args := structs.ServiceSpecificRequest{
Datacenter: s.agent.config.Datacenter,
}
// Check for other DC
params := req.URL.Query()
if other := params.Get("dc"); other != "" {
args.Datacenter = other
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return 0, nil, nil
}
// Check for a tag
params := req.URL.Query()
if _, ok := params["tag"]; ok {
args.ServiceTag = params.Get("tag")
args.TagFilter = true
@ -113,13 +94,13 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
if args.ServiceName == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing service name"))
return nil, nil
return 0, nil, nil
}
// Make the RPC request
var out structs.CheckServiceNodes
var out structs.IndexedCheckServiceNodes
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
return 0, nil, err
}
return out, nil
return out.Index, out.Nodes, nil
}

View File

@ -23,11 +23,15 @@ func TestHealthChecksInState(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.HealthChecksInState(nil, req)
idx, obj, err := srv.HealthChecksInState(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for the server
nodes := obj.(structs.HealthChecks)
if len(nodes) != 1 {
@ -50,11 +54,15 @@ func TestHealthNodeChecks(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.HealthNodeChecks(nil, req)
idx, obj, err := srv.HealthNodeChecks(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for the server
nodes := obj.(structs.HealthChecks)
if len(nodes) != 1 {
@ -92,11 +100,15 @@ func TestHealthServiceChecks(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.HealthServiceChecks(nil, req)
idx, obj, err := srv.HealthServiceChecks(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for consul
nodes := obj.(structs.HealthChecks)
if len(nodes) != 1 {
@ -118,11 +130,15 @@ func TestHealthServiceNodes(t *testing.T) {
t.Fatalf("err: %v", err)
}
obj, err := srv.HealthServiceNodes(nil, req)
idx, obj, err := srv.HealthServiceNodes(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx == 0 {
t.Fatalf("bad: %v", idx)
}
// Should be 1 health check for consul
nodes := obj.(structs.CheckServiceNodes)
if len(nodes) != 1 {

View File

@ -3,10 +3,12 @@ package agent
import (
"bytes"
"encoding/json"
"github.com/hashicorp/consul/consul/structs"
"io"
"log"
"net"
"net/http"
"strconv"
"time"
)
@ -60,15 +62,15 @@ func (s *HTTPServer) registerHandlers() {
s.mux.HandleFunc("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.mux.HandleFunc("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.mux.HandleFunc("/v1/catalog/services", s.wrap(s.CatalogServices))
s.mux.HandleFunc("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.mux.HandleFunc("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
s.mux.HandleFunc("/v1/catalog/nodes", s.wrapQuery(s.CatalogNodes))
s.mux.HandleFunc("/v1/catalog/services", s.wrapQuery(s.CatalogServices))
s.mux.HandleFunc("/v1/catalog/service/", s.wrapQuery(s.CatalogServiceNodes))
s.mux.HandleFunc("/v1/catalog/node/", s.wrapQuery(s.CatalogNodeServices))
s.mux.HandleFunc("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.mux.HandleFunc("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.mux.HandleFunc("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.mux.HandleFunc("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.mux.HandleFunc("/v1/health/node/", s.wrapQuery(s.HealthNodeChecks))
s.mux.HandleFunc("/v1/health/checks/", s.wrapQuery(s.HealthServiceChecks))
s.mux.HandleFunc("/v1/health/state/", s.wrapQuery(s.HealthChecksInState))
s.mux.HandleFunc("/v1/health/service/", s.wrapQuery(s.HealthServiceNodes))
s.mux.HandleFunc("/v1/agent/services", s.wrap(s.AgentServices))
s.mux.HandleFunc("/v1/agent/checks", s.wrap(s.AgentChecks))
@ -118,6 +120,16 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return f
}
// wrapQuery is used to wrap query functions to make them more convenient
func (s *HTTPServer) wrapQuery(handler func(resp http.ResponseWriter, req *http.Request) (uint64, interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
idx, obj, err := handler(resp, req)
setIndex(resp, idx)
return obj, err
}
return s.wrap(f)
}
// Renders a simple index page
func (s *HTTPServer) Index(resp http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/" {
@ -132,3 +144,49 @@ func decodeBody(req *http.Request, out interface{}) error {
dec := json.NewDecoder(req.Body)
return dec.Decode(out)
}
// setIndex is used to set the index response header
func setIndex(resp http.ResponseWriter, index uint64) {
resp.Header().Add("X-Consul-Index", strconv.FormatUint(index, 10))
}
// parseWait is used to parse the ?wait and ?index query params
// Returns true on error
func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.BlockingQuery) bool {
query := req.URL.Query()
if wait := query.Get("wait"); wait != "" {
dur, err := time.ParseDuration(wait)
if err != nil {
resp.WriteHeader(400)
resp.Write([]byte("Invalid wait time"))
return true
}
b.MaxQueryTime = dur
}
if idx := query.Get("index"); idx != "" {
index, err := strconv.ParseUint(idx, 10, 64)
if err != nil {
resp.WriteHeader(400)
resp.Write([]byte("Invalid index"))
return true
}
b.MinQueryIndex = index
}
return false
}
// parseDC is used to parse the ?dc query param
func (s *HTTPServer) parseDC(req *http.Request, dc *string) {
if other := req.URL.Query().Get("dc"); other != "" {
*dc = other
} else if *dc == "" {
*dc = s.agent.config.Datacenter
}
}
// parse is a convenience method for endpoints that need
// to use both parseWait and parseDC.
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.BlockingQuery) bool {
s.parseDC(req, dc)
return parseWait(resp, req, b)
}

View File

@ -216,14 +216,16 @@ func (l *localState) setSyncState() error {
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
}
var services structs.NodeServices
var checks structs.HealthChecks
if e := l.iface.RPC("Catalog.NodeServices", &req, &services); e != nil {
var out1 structs.IndexedNodeServices
var out2 structs.IndexedHealthChecks
if e := l.iface.RPC("Catalog.NodeServices", &req, &out1); e != nil {
return e
}
if err := l.iface.RPC("Health.NodeChecks", &req, &checks); err != nil {
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
return err
}
services := out1.NodeServices
checks := out2.HealthChecks
l.Lock()
defer l.Unlock()

View File

@ -85,18 +85,18 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var services structs.NodeServices
var services structs.IndexedNodeServices
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
t.Fatalf("err: %v", err)
}
// We should have 4 services (consul included)
if len(services.Services) != 4 {
t.Fatalf("bad: %v", services.Services)
if len(services.NodeServices.Services) != 4 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}
// All the services should match
for id, serv := range services.Services {
for id, serv := range services.NodeServices.Services {
switch id {
case "mysql":
if !reflect.DeepEqual(serv, srv1) {
@ -208,18 +208,18 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Datacenter: "dc1",
Node: agent.config.NodeName,
}
var checks structs.HealthChecks
var checks structs.IndexedHealthChecks
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
t.Fatalf("err: %v", err)
}
// We should have 4 services (serf included)
if len(checks) != 4 {
if len(checks.HealthChecks) != 4 {
t.Fatalf("bad: %v", checks)
}
// All the checks should match
for _, chk := range checks {
for _, chk := range checks.HealthChecks {
switch chk.CheckID {
case "mysql":
if !reflect.DeepEqual(chk, chk1) {