diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index b536aad36b..52168b4dbb 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -938,19 +938,8 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s return err } - queryTimeout := queryOpts.GetMaxQueryTime() - // Restrict the max query time, and ensure there is always one. - if queryTimeout > s.config.MaxQueryTime { - queryTimeout = s.config.MaxQueryTime - } else if queryTimeout <= 0 { - queryTimeout = s.config.DefaultQueryTime - } - - // Apply a small amount of jitter to the request. - queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) - - // wrap the base context with a deadline - ctx, cancel := context.WithTimeout(ctx, queryTimeout) + timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime()) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // instrument blockingQueries @@ -1068,6 +1057,22 @@ func (s *Server) consistentRead() error { return structs.ErrNotReadyForConsistentReads } +// rpcQueryTimeout calculates the timeout for the query, ensures it is +// constrained to the configured limit, and adds jitter to prevent multiple +// blocking queries from all timing out at the same time. +func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration { + // Restrict the max query time, and ensure there is always one. + if queryTimeout > s.config.MaxQueryTime { + queryTimeout = s.config.MaxQueryTime + } else if queryTimeout <= 0 { + queryTimeout = s.config.DefaultQueryTime + } + + // Apply a small amount of jitter to the request. + queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) + return queryTimeout +} + // maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the // request is unauthenticated, to limit information leaking. //