diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 52168b4dbb..6a50781708 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -942,13 +942,10 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // instrument blockingQueries - // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1) - // atomic dec when we return from blockingQuery() + count := atomic.AddUint64(&s.queriesBlocking, 1) + metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count)) + // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) - // set the gauge directly to the new value of s.blockingQueries - metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) for { if queryOpts.GetRequireConsistent() { @@ -977,24 +974,17 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s return nil } - // block up to the timeout if we don't see anything fresh. - if err := ws.WatchCtx(ctx); err == nil { - // a non-nil error only occurs when the context is cancelled - - // If a restore may have woken us up then bail out from - // the query immediately. This is slightly race-ey since - // this might have been interrupted for other reasons, - // but it's OK to kick it back to the caller in either - // case. - select { - case <-state.AbandonCh(): - return nil - default: - } + // block until something changes, or the timeout + if err := ws.WatchCtx(ctx); err != nil { + // exit if we've reached the timeout, or other cancellation + return nil } - if ctx.Err() != nil { + // exit if the state store has been abandoned + select { + case <-state.AbandonCh(): return nil + default: } } }