From 4cee14f58a988baad70c0905e90859a7d1277481 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 5 Feb 2014 13:30:18 -0800 Subject: [PATCH] consul: Health endpoints support blocking queries --- consul/health_endpoint.go | 54 ++++++++++++++++++++-------------- consul/health_endpoint_test.go | 20 ++++++++----- consul/state_store.go | 16 ++++++---- consul/structs/structs.go | 5 ++++ 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index 3ae115e558..eb568b96d1 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -12,35 +12,41 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, - reply *structs.HealthChecks) error { + reply *structs.IndexedHealthChecks) error { if done, err := h.srv.forward("Health.ChecksInState", args.Datacenter, args, reply); done { return err } // Get the state specific checks state := h.srv.fsm.State() - _, checks := state.ChecksInState(args.State) - *reply = checks - return nil + return h.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("ChecksInState"), + func() (uint64, error) { + reply.Index, reply.HealthChecks = state.ChecksInState(args.State) + return reply.Index, nil + }) } // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, - reply *structs.HealthChecks) error { + reply *structs.IndexedHealthChecks) error { if done, err := h.srv.forward("Health.NodeChecks", args.Datacenter, args, reply); done { return err } // Get the node checks state := h.srv.fsm.State() - _, checks := state.NodeChecks(args.Node) - *reply = checks - return nil + return h.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("NodeChecks"), + func() (uint64, error) { + reply.Index, reply.HealthChecks = state.NodeChecks(args.Node) + return reply.Index, nil + }) } // ServiceChecks is used to get all the checks for a service func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, - reply *structs.HealthChecks) error { + reply *structs.IndexedHealthChecks) error { // Reject if tag filtering is on if args.TagFilter { return fmt.Errorf("Tag filtering is not supported") @@ -53,13 +59,16 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // Get the service checks state := h.srv.fsm.State() - _, checks := state.ServiceChecks(args.ServiceName) - *reply = checks - return nil + return h.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("ServiceChecks"), + func() (uint64, error) { + reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName) + return reply.Index, nil + }) } // ServiceNodes returns all the nodes registered as part of a service including health info -func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.CheckServiceNodes) error { +func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { if done, err := h.srv.forward("Health.ServiceNodes", args.Datacenter, args, reply); done { return err } @@ -71,13 +80,14 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc // Get the nodes state := h.srv.fsm.State() - var nodes structs.CheckServiceNodes - if args.TagFilter { - _, nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) - } else { - _, nodes = state.CheckServiceNodes(args.ServiceName) - } - - *reply = nodes - return nil + return h.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("CheckServiceNodes"), + func() (uint64, error) { + if args.TagFilter { + reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag) + } else { + reply.Index, reply.Nodes = state.CheckServiceNodes(args.ServiceName) + } + return reply.Index, nil + }) } diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index 39dc4b0e94..000b57ee47 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -31,15 +31,16 @@ func TestHealth_ChecksInState(t *testing.T) { t.Fatalf("err: %v", err) } - var checks structs.HealthChecks + var out2 structs.IndexedHealthChecks inState := structs.ChecksInStateRequest{ Datacenter: "dc1", State: structs.HealthPassing, } - if err := client.Call("Health.ChecksInState", &inState, &checks); err != nil { + if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil { t.Fatalf("err: %v", err) } + checks := out2.HealthChecks if len(checks) != 2 { t.Fatalf("Bad: %v", checks) } @@ -77,15 +78,16 @@ func TestHealth_NodeChecks(t *testing.T) { t.Fatalf("err: %v", err) } - var checks structs.HealthChecks + var out2 structs.IndexedHealthChecks node := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: "foo", } - if err := client.Call("Health.NodeChecks", &node, &checks); err != nil { + if err := client.Call("Health.NodeChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } + checks := out2.HealthChecks if len(checks) != 1 { t.Fatalf("Bad: %v", checks) } @@ -123,15 +125,16 @@ func TestHealth_ServiceChecks(t *testing.T) { t.Fatalf("err: %v", err) } - var checks structs.HealthChecks + var out2 structs.IndexedHealthChecks node := structs.ServiceSpecificRequest{ Datacenter: "dc1", ServiceName: "db", } - if err := client.Call("Health.ServiceChecks", &node, &checks); err != nil { + if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } + checks := out2.HealthChecks if len(checks) != 1 { t.Fatalf("Bad: %v", checks) } @@ -189,17 +192,18 @@ func TestHealth_ServiceNodes(t *testing.T) { t.Fatalf("err: %v", err) } - var nodes structs.CheckServiceNodes + var out2 structs.IndexedCheckServiceNodes req := structs.ServiceSpecificRequest{ Datacenter: "dc1", ServiceName: "db", ServiceTag: "master", TagFilter: false, } - if err := client.Call("Health.ServiceNodes", &req, &nodes); err != nil { + if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil { t.Fatalf("err: %v", err) } + nodes := out2.Nodes if len(nodes) != 2 { t.Fatalf("Bad: %v", nodes) } diff --git a/consul/state_store.go b/consul/state_store.go index 275975bed9..1714e713e6 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -195,10 +195,14 @@ func (s *StateStore) initialize() error { // Setup the query tables // TODO: Other queries... s.queryTables = map[string]MDBTables{ - "Nodes": MDBTables{s.nodeTable}, - "Services": MDBTables{s.serviceTable}, - "ServiceNodes": MDBTables{s.nodeTable, s.serviceTable}, - "NodeServices": MDBTables{s.nodeTable, s.serviceTable}, + "Nodes": MDBTables{s.nodeTable}, + "Services": MDBTables{s.serviceTable}, + "ServiceNodes": MDBTables{s.nodeTable, s.serviceTable}, + "NodeServices": MDBTables{s.nodeTable, s.serviceTable}, + "ChecksInState": MDBTables{s.checkTable}, + "NodeChecks": MDBTables{s.checkTable}, + "ServiceChecks": MDBTables{s.checkTable}, + "CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, } return nil } @@ -598,7 +602,7 @@ func parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, struct // CheckServiceNodes returns the nodes associated with a given service, along // with any associated check func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) { - tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable} + tables := s.queryTables["CheckServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) @@ -617,7 +621,7 @@ func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckSer // CheckServiceNodes returns the nodes associated with a given service, along // with any associated checks func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) { - tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable} + tables := s.queryTables["CheckServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 8b44596a49..0e085dcb80 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -167,6 +167,11 @@ type IndexedHealthChecks struct { HealthChecks HealthChecks } +type IndexedCheckServiceNodes struct { + Index uint64 + Nodes CheckServiceNodes +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle