diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index cd2fc26522..248c0c82cc 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -7,6 +7,7 @@ import ( "io" "net" "strings" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -519,8 +520,12 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { var timeout *time.Timer + var queriesBlocking uint64 var queryTimeout time.Duration + // Instrument all queries run + metrics.IncrCounter([]string{"rpc", "query"}, 1) + minQueryIndex := queryOpts.GetMinQueryIndex() // Fast path right to the non-blocking query. if minQueryIndex == 0 { @@ -542,10 +547,20 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s timeout = time.NewTimer(queryTimeout) defer timeout.Stop() + // instrument blockingQueries + // atomic inc our server's count of in-flight blockingQueries and store the new value + queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) + // atomic dec when we return from blockingQuery() + defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) + // set the gauge directly to the new value of s.blockingQueries + metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) + RUN_QUERY: + // Setup blocking loop // Update the query metadata. s.setQueryMeta(queryMeta) + // Validate // If the read must be consistent we verify that we are still the leader. if queryOpts.GetRequireConsistent() { if err := s.consistentRead(); err != nil { @@ -553,8 +568,7 @@ RUN_QUERY: } } - // Run the query. - metrics.IncrCounter([]string{"rpc", "query"}, 1) + // Run query // Operate on a consistent set of state. This makes sure that the // abandon channel goes with the state that the caller is using to @@ -571,7 +585,7 @@ RUN_QUERY: ws.Add(state.AbandonCh()) } - // Block up to the timeout if we didn't see anything fresh. + // Execute the queryFn err := fn(ws, state) // Note we check queryOpts.MinQueryIndex is greater than zero to determine if // blocking was requested by client, NOT meta.Index since the state function @@ -584,6 +598,7 @@ RUN_QUERY: if err == nil && queryMeta.GetIndex() < 1 { queryMeta.SetIndex(1) } + // block up to the timeout if we don't see anything fresh. if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { if expired := ws.Watch(timeout.C); !expired { // If a restore may have woken us up then bail out from @@ -594,6 +609,7 @@ RUN_QUERY: select { case <-state.AbandonCh(): default: + // loop back and look for an update again goto RUN_QUERY } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 332fa9aa8b..4b592ac944 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -110,6 +110,12 @@ var ( // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { + // queriesBlocking is a counter that we incr and decr atomically in + // rpc calls to provide telemetry on how many blocking queries are running. + // We interact with queriesBlocking atomically, do not move without ensuring it is + // correctly 64-byte aligned in the struct layout + queriesBlocking uint64 + // aclConfig is the configuration for the ACL system aclConfig *acl.Config diff --git a/website/source/docs/agent/telemetry.html.md b/website/source/docs/agent/telemetry.html.md index 7bb420eb3e..2f0766d90d 100644 --- a/website/source/docs/agent/telemetry.html.md +++ b/website/source/docs/agent/telemetry.html.md @@ -765,10 +765,16 @@ These metrics are used to monitor the health of the Consul servers.