diff --git a/.changelog/10299.txt b/.changelog/10299.txt new file mode 100644 index 0000000000..df91b2f31c --- /dev/null +++ b/.changelog/10299.txt @@ -0,0 +1,4 @@ +```release-note:bug +use the MaxQueryTime instead of RPCHoldTimeout for blocking RPC queries + [[GH-8978](https://github.com/hashicorp/consul/pull/8978)]. +``` diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 6954699e9d..b19b904308 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1665,7 +1665,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) { defer codec.Close() // Run the query, do not wait for leader, never any contact with leader, should fail - if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err.Error() != structs.ErrNoLeader.Error() { t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out) } @@ -1677,6 +1677,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) { testrpc.WaitForLeader(t, s2.RPC, "dc1") retry.Run(t, func(r *retry.R) { + out = structs.IndexedServices{} if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { r.Fatalf("err: %v", err) } @@ -1696,24 +1697,25 @@ func TestCatalog_ListServices_Stale(t *testing.T) { args.AllowStale = false // Since the leader is now down, non-stale query should fail now - if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() { + out = structs.IndexedServices{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err.Error() != structs.ErrLeaderNotTracked.Error() { t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out) } + if out.KnownLeader { + t.Fatalf("should not have a leader anymore: %#v", out) + } // With stale, request should still work args.AllowStale = true - if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { - t.Fatalf("err: %v", err) - } - - // Should find old service - if len(out.Services) != 1 { - t.Fatalf("bad: %#v", out) - } - - if out.KnownLeader { - t.Fatalf("should not have a leader anymore: %#v", out) - } + retry.Run(t, func(r *retry.R) { + out = structs.IndexedServices{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { + r.Fatalf("err: %v", err) + } + if out.KnownLeader || len(out.Services) != 1 { + r.Fatalf("got %t nodes want %d", out.KnownLeader, len(out.Services)) + } + }) } func TestCatalog_ListServiceNodes(t *testing.T) { diff --git a/agent/consul/client.go b/agent/consul/client.go index d71c81519b..d09dd34c73 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -285,18 +285,16 @@ TRY: // Use the zero value for RPCInfo if the request doesn't implement RPCInfo info, _ := args.(structs.RPCInfo) - if retry := canRetry(info, rpcErr); !retry { + if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry { return rpcErr } // We can wait a bit and retry! - if time.Since(firstCheck) < c.config.RPCHoldTimeout { - jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction) - select { - case <-time.After(jitter): - goto TRY - case <-c.shutdownCh: - } + jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) + select { + case <-time.After(jitter): + goto TRY + case <-c.shutdownCh: } return rpcErr } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 0766afe2e7..1a8f22e3a0 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -75,12 +75,6 @@ var RPCSummaries = []prometheus.SummaryDefinition{ } const ( - // jitterFraction is a the limit to the amount of jitter we apply - // to a user specified MaxQueryTime. We divide the specified time by - // the fraction. So 16 == 6.25% limit of jitter. This same fraction - // is applied to the RPCHoldTimeout - jitterFraction = 16 - // Warn if the Raft command is larger than this. // If it's over 1MB something is probably being abusive. raftWarnSize = 1024 * 1024 @@ -526,7 +520,14 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { } // canRetry returns true if the request and error indicate that a retry is safe. -func canRetry(info structs.RPCInfo, err error) bool { +func canRetry(info structs.RPCInfo, err error, start time.Time, config *Config) bool { + if info != nil && info.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) { + // RPCInfo timeout may include extra time for MaxQueryTime + return false + } else if info == nil && time.Since(start) > config.RPCHoldTimeout { + // When not RPCInfo, timeout is only RPCHoldTimeout + return false + } // No leader errors are always safe to retry since no state could have // been changed. if structs.IsErrNoLeader(err) { @@ -545,16 +546,16 @@ func canRetry(info structs.RPCInfo, err error) bool { // ForwardRPC is used to forward an RPC request to a remote DC or to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) { - var firstCheck time.Time +func (s *Server) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) { + firstCheck := time.Now() // Handle DC forwarding - dc := req.RequestDatacenter() + dc := info.RequestDatacenter() if dc != s.config.Datacenter { // Local tokens only work within the current datacenter. Check to see // if we are attempting to forward one to a remote datacenter and strip // it, falling back on the anonymous token on the other end. - if token := req.TokenSecret(); token != "" { + if token := info.TokenSecret(); token != "" { done, ident, err := s.ResolveIdentityFromToken(token) if done { if err != nil && !acl.IsErrNotFound(err) { @@ -562,18 +563,18 @@ func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{ } if ident != nil && ident.IsLocal() { // Strip it from the request. - req.SetTokenSecret("") - defer req.SetTokenSecret(token) + info.SetTokenSecret("") + defer info.SetTokenSecret(token) } } } - err := s.forwardDC(method, dc, req, reply) + err := s.forwardDC(method, dc, info, reply) return true, err } // Check if we can allow a stale read, ensure our local DB is initialized - if req.IsRead() && req.AllowStaleRead() && !s.raft.LastContact().IsZero() { + if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() { return false, nil } @@ -596,20 +597,15 @@ CHECK_LEADER: // Handle the case of a known leader if leader != nil { rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, - method, req, reply) - if rpcErr != nil && canRetry(req, rpcErr) { - goto RETRY + method, info, reply) + if rpcErr == nil { + return true, nil } - return true, rpcErr } -RETRY: - // Gate the request until there is a leader - if firstCheck.IsZero() { - firstCheck = time.Now() - } - if time.Since(firstCheck) < s.config.RPCHoldTimeout { - jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + if retry := canRetry(info, rpcErr, firstCheck, s.config); retry { + // Gate the request until there is a leader + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) select { case <-time.After(jitter): goto CHECK_LEADER @@ -832,7 +828,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s } // Apply a small amount of jitter to the request. - queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction) + queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) // wrap the base context with a deadline ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) @@ -933,7 +929,7 @@ func (s *Server) consistentRead() error { if s.isReadyForConsistentReads() { return nil } - jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction) deadline := time.Now().Add(s.config.RPCHoldTimeout) for time.Now().Before(deadline) { diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 48981234be..a4dd44fa52 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -962,10 +962,17 @@ func TestCanRetry(t *testing.T) { req structs.RPCInfo err error expected bool + timeout time.Time } - + config := DefaultConfig() + now := time.Now() + config.RPCHoldTimeout = 7 * time.Second run := func(t *testing.T, tc testCase) { - require.Equal(t, tc.expected, canRetry(tc.req, tc.err)) + timeOutValue := tc.timeout + if timeOutValue.IsZero() { + timeOutValue = now + } + require.Equal(t, tc.expected, canRetry(tc.req, tc.err, timeOutValue, config)) } var testCases = []testCase{ @@ -990,6 +997,46 @@ func TestCanRetry(t *testing.T) { err: io.EOF, expected: true, }, + { + name: "EOF error", + req: &structs.DCSpecificRequest{}, + err: io.EOF, + expected: true, + }, + { + name: "HasTimedOut implementation with no error", + req: &structs.DCSpecificRequest{}, + err: nil, + expected: false, + }, + { + name: "HasTimedOut implementation timedOut with no error", + req: &structs.DCSpecificRequest{}, + err: nil, + expected: false, + timeout: now.Add(-(config.RPCHoldTimeout + time.Second)), + }, + { + name: "HasTimedOut implementation timedOut (with EOF error)", + req: &structs.DCSpecificRequest{}, + err: io.EOF, + expected: false, + timeout: now.Add(-(config.RPCHoldTimeout + time.Second)), + }, + { + name: "HasTimedOut implementation timedOut blocking call", + req: &structs.DCSpecificRequest{QueryOptions: structs.QueryOptions{MaxQueryTime: 300, MinQueryIndex: 1}}, + err: nil, + expected: false, + timeout: now.Add(-(config.RPCHoldTimeout + config.MaxQueryTime + time.Second)), + }, + { + name: "HasTimedOut implementation timedOut blocking call (MaxQueryTime not set)", + req: &structs.DCSpecificRequest{QueryOptions: structs.QueryOptions{MinQueryIndex: 1}}, + err: nil, + expected: false, + timeout: now.Add(-(config.RPCHoldTimeout + config.MaxQueryTime + time.Second)), + }, { name: "EOF on write request", err: io.EOF, @@ -1011,3 +1058,7 @@ type isReadRequest struct { func (r isReadRequest) IsRead() bool { return true } + +func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool { + return false +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 9f0aefa263..ae4b50d3e3 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -159,6 +159,12 @@ const ( // we multiply by time.Second lockDelayMinThreshold = 1000 + // JitterFraction is a the limit to the amount of jitter we apply + // to a user specified MaxQueryTime. We divide the specified time by + // the fraction. So 16 == 6.25% limit of jitter. This same fraction + // is applied to the RPCHoldTimeout + JitterFraction = 16 + // WildcardSpecifier is the string which should be used for specifying a wildcard // The exact semantics of the wildcard is left up to the code where its used. WildcardSpecifier = "*" @@ -193,6 +199,7 @@ type RPCInfo interface { AllowStaleRead() bool TokenSecret() string SetTokenSecret(string) + HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool } // QueryOptions is used to specify various flags for read queries @@ -291,6 +298,20 @@ func (q *QueryOptions) SetTokenSecret(s string) { q.Token = s } +func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool { + if q.MinQueryIndex > 0 { + if q.MaxQueryTime > maxQueryTime { + q.MaxQueryTime = maxQueryTime + } else if q.MaxQueryTime <= 0 { + q.MaxQueryTime = defaultQueryTime + } + q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction) + + return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout) + } + return time.Since(start) > rpcHoldTimeout +} + type WriteRequest struct { // Token is the ACL token ID. If not provided, the 'anonymous' // token is assumed for backwards compatibility. @@ -314,6 +335,10 @@ func (w *WriteRequest) SetTokenSecret(s string) { w.Token = s } +func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool { + return time.Since(start) > rpcHoldTimeout +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { diff --git a/lib/eof.go b/lib/eof.go index e29d151109..aadabe731e 100644 --- a/lib/eof.go +++ b/lib/eof.go @@ -16,6 +16,9 @@ var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error() // IsErrEOF returns true if we get an EOF error from the socket itself, or // an EOF equivalent error from yamux. func IsErrEOF(err error) bool { + if err == nil { + return false + } if errors.Is(err, io.EOF) { return true } diff --git a/proto/pbautoconf/auto_config.go b/proto/pbautoconf/auto_config.go index 00b788b613..8a3cce8f6e 100644 --- a/proto/pbautoconf/auto_config.go +++ b/proto/pbautoconf/auto_config.go @@ -1,5 +1,7 @@ package pbautoconf +import "time" + func (req *AutoConfigRequest) RequestDatacenter() string { return req.Datacenter } @@ -19,3 +21,7 @@ func (req *AutoConfigRequest) TokenSecret() string { func (req *AutoConfigRequest) SetTokenSecret(token string) { req.ConsulToken = token } + +func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool { + return time.Since(start) > rpcHoldTimeout +}