mirror of https://github.com/status-im/consul.git
consul: Moving QueryMeta handling into blockingRPC
This commit is contained in:
parent
a2acbe732e
commit
beeeb86a12
|
@ -98,9 +98,9 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
||||||
// Get the local state
|
// Get the local state
|
||||||
state := c.srv.fsm.State()
|
state := c.srv.fsm.State()
|
||||||
return c.srv.blockingRPC(&args.BlockingQuery,
|
return c.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("Nodes"),
|
state.QueryTables("Nodes"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
c.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.Nodes = state.Nodes()
|
reply.Index, reply.Nodes = state.Nodes()
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
@ -115,9 +115,9 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
|
||||||
// Get the current nodes
|
// Get the current nodes
|
||||||
state := c.srv.fsm.State()
|
state := c.srv.fsm.State()
|
||||||
return c.srv.blockingRPC(&args.BlockingQuery,
|
return c.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("Services"),
|
state.QueryTables("Services"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
c.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.Services = state.Services()
|
reply.Index, reply.Services = state.Services()
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
@ -137,9 +137,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
||||||
// Get the nodes
|
// Get the nodes
|
||||||
state := c.srv.fsm.State()
|
state := c.srv.fsm.State()
|
||||||
err := c.srv.blockingRPC(&args.BlockingQuery,
|
err := c.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("ServiceNodes"),
|
state.QueryTables("ServiceNodes"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
c.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
if args.TagFilter {
|
if args.TagFilter {
|
||||||
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
|
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
|
||||||
} else {
|
} else {
|
||||||
|
@ -175,9 +175,9 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
|
||||||
// Get the node services
|
// Get the node services
|
||||||
state := c.srv.fsm.State()
|
state := c.srv.fsm.State()
|
||||||
return c.srv.blockingRPC(&args.BlockingQuery,
|
return c.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("NodeServices"),
|
state.QueryTables("NodeServices"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
c.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
|
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -21,9 +21,9 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
|
||||||
// Get the state specific checks
|
// Get the state specific checks
|
||||||
state := h.srv.fsm.State()
|
state := h.srv.fsm.State()
|
||||||
return h.srv.blockingRPC(&args.BlockingQuery,
|
return h.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("ChecksInState"),
|
state.QueryTables("ChecksInState"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
h.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
|
reply.Index, reply.HealthChecks = state.ChecksInState(args.State)
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
@ -39,9 +39,9 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
|
||||||
// Get the node checks
|
// Get the node checks
|
||||||
state := h.srv.fsm.State()
|
state := h.srv.fsm.State()
|
||||||
return h.srv.blockingRPC(&args.BlockingQuery,
|
return h.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("NodeChecks"),
|
state.QueryTables("NodeChecks"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
h.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
|
reply.Index, reply.HealthChecks = state.NodeChecks(args.Node)
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
@ -63,9 +63,9 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
|
||||||
// Get the service checks
|
// Get the service checks
|
||||||
state := h.srv.fsm.State()
|
state := h.srv.fsm.State()
|
||||||
return h.srv.blockingRPC(&args.BlockingQuery,
|
return h.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("ServiceChecks"),
|
state.QueryTables("ServiceChecks"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
h.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
|
reply.Index, reply.HealthChecks = state.ServiceChecks(args.ServiceName)
|
||||||
return reply.Index, nil
|
return reply.Index, nil
|
||||||
})
|
})
|
||||||
|
@ -85,9 +85,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
||||||
// Get the nodes
|
// Get the nodes
|
||||||
state := h.srv.fsm.State()
|
state := h.srv.fsm.State()
|
||||||
err := h.srv.blockingRPC(&args.BlockingQuery,
|
err := h.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("CheckServiceNodes"),
|
state.QueryTables("CheckServiceNodes"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
h.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
if args.TagFilter {
|
if args.TagFilter {
|
||||||
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
|
reply.Index, reply.Nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -51,9 +51,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
||||||
// Get the local state
|
// Get the local state
|
||||||
state := k.srv.fsm.State()
|
state := k.srv.fsm.State()
|
||||||
return k.srv.blockingRPC(&args.BlockingQuery,
|
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("KVSGet"),
|
state.QueryTables("KVSGet"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
k.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
index, ent, err := state.KVSGet(args.Key)
|
index, ent, err := state.KVSGet(args.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -84,9 +84,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
||||||
// Get the local state
|
// Get the local state
|
||||||
state := k.srv.fsm.State()
|
state := k.srv.fsm.State()
|
||||||
return k.srv.blockingRPC(&args.BlockingQuery,
|
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
&reply.QueryMeta,
|
||||||
state.QueryTables("KVSList"),
|
state.QueryTables("KVSList"),
|
||||||
func() (uint64, error) {
|
func() (uint64, error) {
|
||||||
k.srv.setQueryMeta(&reply.QueryMeta)
|
|
||||||
index, ent, err := state.KVSList(args.Key)
|
index, ent, err := state.KVSList(args.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
|
@ -203,7 +203,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
||||||
|
|
||||||
// blockingRPC is used for queries that need to wait for a
|
// blockingRPC is used for queries that need to wait for a
|
||||||
// minimum index. This is used to block and wait for changes.
|
// minimum index. This is used to block and wait for changes.
|
||||||
func (s *Server) blockingRPC(b *structs.BlockingQuery, tables MDBTables, run func() (uint64, error)) error {
|
func (s *Server) blockingRPC(b *structs.BlockingQuery, m *structs.QueryMeta,
|
||||||
|
tables MDBTables, run func() (uint64, error)) error {
|
||||||
var timeout <-chan time.Time
|
var timeout <-chan time.Time
|
||||||
var notifyCh chan struct{}
|
var notifyCh chan struct{}
|
||||||
|
|
||||||
|
@ -239,8 +240,11 @@ SETUP_NOTIFY:
|
||||||
s.fsm.State().Watch(tables, notifyCh)
|
s.fsm.State().Watch(tables, notifyCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the query function
|
|
||||||
RUN_QUERY:
|
RUN_QUERY:
|
||||||
|
// Update the query meta data
|
||||||
|
s.setQueryMeta(m)
|
||||||
|
|
||||||
|
// Run the query function
|
||||||
idx, err := run()
|
idx, err := run()
|
||||||
|
|
||||||
// Check for minimum query time
|
// Check for minimum query time
|
||||||
|
|
Loading…
Reference in New Issue