From 25a4f3c83b15f90659c78e84f7f1794d2c8a09bb Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Wed, 24 Jun 2020 12:36:54 -0400 Subject: [PATCH] Allow cancelling blocking queries in response to shutting down. --- agent/agent.go | 13 +++++++++---- agent/consul/rpc.go | 15 ++++++++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ae89fb936d..d1106c9c48 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4424,7 +4424,8 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura // If we are not blocking we can skip tracking and allocating - nil WatchSet // is still valid to call Add on and will just be a no op. var ws memdb.WatchSet - var timeout *time.Timer + var ctx context.Context = &lib.StopChannelContext{StopCh: a.shutdownCh} + shouldBlock := false if alwaysBlock || hash != "" { if wait == 0 { @@ -4435,7 +4436,11 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura } // Apply a small amount of jitter to the request. wait += lib.RandomStagger(wait / 16) - timeout = time.NewTimer(wait) + var cancel func() + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(wait)) + defer cancel() + + shouldBlock = true } for { @@ -4453,7 +4458,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura // WatchSet immediately returns false which would incorrectly cause this to // loop and repeat again, however we rely on the invariant that ws == nil // IFF timeout == nil in which case the Watch call is never invoked. - if timeout == nil || hash != curHash || ws.Watch(timeout.C) { + if !shouldBlock || hash != curHash || ws.WatchCtx(ctx) != nil { return curHash, curResp, err } // Watch returned false indicating a change was detected, loop and repeat @@ -4465,7 +4470,7 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura if syncPauseCh := a.SyncPausedCh(); syncPauseCh != nil { select { case <-syncPauseCh: - case <-timeout.C: + case <-ctx.Done(): } } } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 6edaa54a4d..938f1ee21e 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -1,6 +1,7 @@ package consul import ( + "context" "crypto/tls" "encoding/binary" "errors" @@ -752,7 +753,9 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { - var timeout *time.Timer + var cancel func() + var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} + var queriesBlocking uint64 var queryTimeout time.Duration @@ -776,9 +779,9 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s // Apply a small amount of jitter to the request. queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction) - // Setup a query timeout. - timeout = time.NewTimer(queryTimeout) - defer timeout.Stop() + // wrap the base context with a deadline + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + defer cancel() // instrument blockingQueries // atomic inc our server's count of in-flight blockingQueries and store the new value @@ -833,7 +836,9 @@ RUN_QUERY: } // block up to the timeout if we don't see anything fresh. if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { - if expired := ws.Watch(timeout.C); !expired { + if err := ws.WatchCtx(ctx); err == nil { + // a non-nil error only occurs when the context is cancelled + // If a restore may have woken us up then bail out from // the query immediately. This is slightly race-ey since // this might have been interrupted for other reasons,