From f92dc11002c9697dcf1bd30fd9995ab5ce8f13b4 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 12 Jan 2022 17:46:12 -0500 Subject: [PATCH 1/5] rpc: refactor blocking query To remove the TODO, and make it more readable. In general this reduces the scope of variables, making them easier to reason about. It also introduces more early returns so that we can see the flow from the structure of the function. --- agent/consul/rpc.go | 101 ++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index c8fa6846cd..ddf4299508 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -919,22 +919,26 @@ type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { - var cancel func() var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} - var queriesBlocking uint64 - var queryTimeout time.Duration - - // Instrument all queries run metrics.IncrCounter([]string{"rpc", "query"}, 1) minQueryIndex := queryOpts.GetMinQueryIndex() - // Fast path right to the non-blocking query. + // Perform a non-blocking query if minQueryIndex == 0 { - goto RUN_QUERY + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } + } + + var ws memdb.WatchSet + err := fn(ws, s.fsm.State()) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + return err } - queryTimeout = queryOpts.GetMaxQueryTime() + queryTimeout := queryOpts.GetMaxQueryTime() // Restrict the max query time, and ensure there is always one. if queryTimeout > s.config.MaxQueryTime { queryTimeout = s.config.MaxQueryTime @@ -946,64 +950,57 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) // wrap the base context with a deadline - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() // instrument blockingQueries // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking = atomic.AddUint64(&s.queriesBlocking, 1) + queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1) // atomic dec when we return from blockingQuery() defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) // set the gauge directly to the new value of s.blockingQueries metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) -RUN_QUERY: - // Setup blocking loop - - // Validate - // If the read must be consistent we verify that we are still the leader. - if queryOpts.GetRequireConsistent() { - if err := s.consistentRead(); err != nil { - return err + for { + if queryOpts.GetRequireConsistent() { + if err := s.consistentRead(); err != nil { + return err + } } - } - // Run query - - // Operate on a consistent set of state. This makes sure that the - // abandon channel goes with the state that the caller is using to - // build watches. - state := s.fsm.State() - - // We can skip all watch tracking if this isn't a blocking query. - var ws memdb.WatchSet - if minQueryIndex > 0 { - ws = memdb.NewWatchSet() + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state := s.fsm.State() + ws := memdb.NewWatchSet() // This channel will be closed if a snapshot is restored and the // whole state store is abandoned. ws.Add(state.AbandonCh()) - } - // Execute the queryFn - err := fn(ws, state) + err := fn(ws, state) + s.setQueryMeta(queryMeta, queryOpts.GetToken()) + if err != nil { + return err + } - // Update the query metadata. - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + // Note we check queryOpts.MinQueryIndex is greater than zero to determine if + // blocking was requested by client, NOT meta.Index since the state function + // might return zero if something is not initialized and care wasn't taken to + // handle that special case (in practice this happened a lot so fixing it + // systematically here beats trying to remember to add zero checks in every + // state method). We also need to ensure that unless there is an error, we + // return an index > 0 otherwise the client will never block and burn CPU and + // requests. + if queryMeta.GetIndex() < 1 { + queryMeta.SetIndex(1) + } - // Note we check queryOpts.MinQueryIndex is greater than zero to determine if - // blocking was requested by client, NOT meta.Index since the state function - // might return zero if something is not initialized and care wasn't taken to - // handle that special case (in practice this happened a lot so fixing it - // systematically here beats trying to remember to add zero checks in every - // state method). We also need to ensure that unless there is an error, we - // return an index > 0 otherwise the client will never block and burn CPU and - // requests. - if err == nil && queryMeta.GetIndex() < 1 { - queryMeta.SetIndex(1) - } - // block up to the timeout if we don't see anything fresh. - if err == nil && minQueryIndex > 0 && queryMeta.GetIndex() <= minQueryIndex { + if queryMeta.GetIndex() > minQueryIndex { + return nil + } + + // block up to the timeout if we don't see anything fresh. if err := ws.WatchCtx(ctx); err == nil { // a non-nil error only occurs when the context is cancelled @@ -1014,13 +1011,15 @@ RUN_QUERY: // case. select { case <-state.AbandonCh(): + return nil default: - // loop back and look for an update again - goto RUN_QUERY } } + + if ctx.Err() != nil { + return nil + } } - return err } // setQueryMeta is used to populate the QueryMeta data for an RPC call From 4b67d6c18b16b5f59bb1151509362564e38e348a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 14 Jan 2022 18:53:31 -0500 Subject: [PATCH 2/5] rpc: add subtests to blockingQuery test --- agent/consul/rpc_test.go | 70 +++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index bde9b4d9ea..0782cbadee 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -233,13 +233,10 @@ func TestRPC_blockingQuery(t *testing.T) { defer os.RemoveAll(dir) defer s.Shutdown() - require := require.New(t) - assert := assert.New(t) - // Perform a non-blocking query. Note that it's significant that the meta has // a zero index in response - the implied opts.MinQueryIndex is also zero but // this should not block still. - { + t.Run("non-blocking query", func(t *testing.T) { var opts structs.QueryOptions var meta structs.QueryMeta var calls int @@ -247,16 +244,13 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 1 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) // Perform a blocking query that gets woken up and loops around once. - { + t.Run("blocking query - single loop", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 3, } @@ -275,13 +269,10 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 2 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) // Perform a blocking query that returns a zero index from blocking func (e.g. // no state yet). This should still return an empty response immediately, but @@ -292,7 +283,7 @@ func TestRPC_blockingQuery(t *testing.T) { // covered by tests but eventually when hit in the wild causes blocking // clients to busy loop and burn CPU. This test ensure that blockingQuery // systematically does the right thing to prevent future bugs like that. - { + t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 0, } @@ -311,9 +302,9 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - require.NoError(s.blockingQuery(&opts, &meta, fn)) - assert.Equal(1, calls) - assert.Equal(uint64(1), meta.Index, + require.NoError(t, s.blockingQuery(&opts, &meta, fn)) + assert.Equal(t, 1, calls) + assert.Equal(t, uint64(1), meta.Index, "expect fake index of 1 to force client to block on next update") // Simulate client making next request @@ -322,19 +313,19 @@ func TestRPC_blockingQuery(t *testing.T) { // This time we should block even though the func returns index 0 still t0 := time.Now() - require.NoError(s.blockingQuery(&opts, &meta, fn)) + require.NoError(t, s.blockingQuery(&opts, &meta, fn)) t1 := time.Now() - assert.Equal(2, calls) - assert.Equal(uint64(1), meta.Index, + assert.Equal(t, 2, calls) + assert.Equal(t, uint64(1), meta.Index, "expect fake index of 1 to force client to block on next update") - assert.True(t1.Sub(t0) > 20*time.Millisecond, + assert.True(t, t1.Sub(t0) > 20*time.Millisecond, "should have actually blocked waiting for timeout") - } + }) // Perform a query that blocks and gets interrupted when the state store // is abandoned. - { + t.Run("blocking query interrupted by abandonCh", func(t *testing.T) { opts := structs.QueryOptions{ MinQueryIndex: 3, } @@ -363,13 +354,10 @@ func TestRPC_blockingQuery(t *testing.T) { calls++ return nil } - if err := s.blockingQuery(&opts, &meta, fn); err != nil { - t.Fatalf("err: %v", err) - } - if calls != 1 { - t.Fatalf("bad: %d", calls) - } - } + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) { opts := structs.QueryOptions{ @@ -382,13 +370,13 @@ func TestRPC_blockingQuery(t *testing.T) { } err := s.blockingQuery(&opts, &meta, fn) - require.NoError(err) - require.False(meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be reset for unauthenticated calls") + require.NoError(t, err) + require.False(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be reset for unauthenticated calls") }) t.Run("ResultsFilteredByACLs is honored for authenticated calls", func(t *testing.T) { token, err := lib.GenerateUUID(nil) - require.NoError(err) + require.NoError(t, err) opts := structs.QueryOptions{ Token: token, @@ -400,8 +388,8 @@ func TestRPC_blockingQuery(t *testing.T) { } err = s.blockingQuery(&opts, &meta, fn) - require.NoError(err) - require.True(meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls") + require.NoError(t, err) + require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls") }) } From fd0a9fd4f3af209dd4ae47c3d9c8a53e677e797e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 17 Jan 2022 15:09:08 -0500 Subject: [PATCH 3/5] rpc: move the index defaulting to setQueryMeta. This safeguard should be safe to apply in general. We are already applying it to non-blocking queries that call blockingQuery, so it should be fine to apply it to others. --- agent/consul/rpc.go | 23 +++++++++++------------ agent/consul/txn_endpoint_test.go | 1 + agent/structs/structs.go | 4 +++- agent/txn_endpoint_test.go | 6 ++++-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index ddf4299508..b536aad36b 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -984,18 +984,6 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s return err } - // Note we check queryOpts.MinQueryIndex is greater than zero to determine if - // blocking was requested by client, NOT meta.Index since the state function - // might return zero if something is not initialized and care wasn't taken to - // handle that special case (in practice this happened a lot so fixing it - // systematically here beats trying to remember to add zero checks in every - // state method). We also need to ensure that unless there is an error, we - // return an index > 0 otherwise the client will never block and burn CPU and - // requests. - if queryMeta.GetIndex() < 1 { - queryMeta.SetIndex(1) - } - if queryMeta.GetIndex() > minQueryIndex { return nil } @@ -1034,6 +1022,17 @@ func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) { m.SetKnownLeader(s.raft.Leader() != "") } maskResultsFilteredByACLs(token, m) + + // Always set a non-zero QueryMeta.Index. Generally we expect the + // QueryMeta.Index to be set to structs.RaftIndex.ModifyIndex. If the query + // returned no results we expect it to be set to the max index of the table, + // however we can't guarantee this always happens. + // To prevent a client from accidentally performing many non-blocking queries + // (which causes lots of unnecessary load), we always set a default value of 1. + // This is sufficient to prevent the unnecessary load in most cases. + if m.GetIndex() < 1 { + m.SetIndex(1) + } } // consistentRead is used to ensure we do not perform a stale diff --git a/agent/consul/txn_endpoint_test.go b/agent/consul/txn_endpoint_test.go index 9619dc8815..10178866a0 100644 --- a/agent/consul/txn_endpoint_test.go +++ b/agent/consul/txn_endpoint_test.go @@ -821,6 +821,7 @@ func TestTxn_Read(t *testing.T) { }, QueryMeta: structs.QueryMeta{ KnownLeader: true, + Index: 1, }, } require.Equal(expected, out) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index cb84f77d99..168c70efe1 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -370,7 +370,9 @@ func (q QueryBackend) String() string { // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { - // Index in the raft log of the latest item returned by the query. + // Index in the raft log of the latest item returned by the query. If the + // query did not return any results the Index will be a value that will + // change when a new item is added. Index uint64 // If AllowStale is used, this is time elapsed since diff --git a/agent/txn_endpoint_test.go b/agent/txn_endpoint_test.go index 1d6d3f01be..6b3a7c4680 100644 --- a/agent/txn_endpoint_test.go +++ b/agent/txn_endpoint_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/raft" - "github.com/stretchr/testify/assert" ) func TestTxnEndpoint_Bad_JSON(t *testing.T) { @@ -385,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) { }, QueryMeta: structs.QueryMeta{ KnownLeader: true, + Index: 1, }, } assert.Equal(t, expected, txnResp) From 72a733bed829e8fb03472c9efd36c9a5ffd3300a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 17 Jan 2022 16:33:17 -0500 Subject: [PATCH 4/5] rpc: extract rpcQueryTimeout method This helps keep the logic in blockingQuery more focused. In the future we may have a separate struct for RPC queries which may allow us to move this off of Server. --- agent/consul/rpc.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index b536aad36b..52168b4dbb 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -938,19 +938,8 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s return err } - queryTimeout := queryOpts.GetMaxQueryTime() - // Restrict the max query time, and ensure there is always one. - if queryTimeout > s.config.MaxQueryTime { - queryTimeout = s.config.MaxQueryTime - } else if queryTimeout <= 0 { - queryTimeout = s.config.DefaultQueryTime - } - - // Apply a small amount of jitter to the request. - queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) - - // wrap the base context with a deadline - ctx, cancel := context.WithTimeout(ctx, queryTimeout) + timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime()) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // instrument blockingQueries @@ -1068,6 +1057,22 @@ func (s *Server) consistentRead() error { return structs.ErrNotReadyForConsistentReads } +// rpcQueryTimeout calculates the timeout for the query, ensures it is +// constrained to the configured limit, and adds jitter to prevent multiple +// blocking queries from all timing out at the same time. +func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration { + // Restrict the max query time, and ensure there is always one. + if queryTimeout > s.config.MaxQueryTime { + queryTimeout = s.config.MaxQueryTime + } else if queryTimeout <= 0 { + queryTimeout = s.config.DefaultQueryTime + } + + // Apply a small amount of jitter to the request. + queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction) + return queryTimeout +} + // maskResultsFilteredByACLs blanks out the ResultsFilteredByACLs flag if the // request is unauthenticated, to limit information leaking. // From 71767f1b3ec02c6d17004f1a610209146d3a28b6 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 17 Jan 2022 16:43:56 -0500 Subject: [PATCH 5/5] rpc: cleanup exit and blocking condition logic in blockingQuery Remove some unnecessary comments around query_blocking metric. The only line that needs any comments in the atomic decrement. Cleanup the block and return comments and logic. The old comment about AbandonCh may have been relevant before, but it is expected behaviour now. The logic was simplified by inverting the err condition. --- agent/consul/rpc.go | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 52168b4dbb..6a50781708 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -942,13 +942,10 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // instrument blockingQueries - // atomic inc our server's count of in-flight blockingQueries and store the new value - queriesBlocking := atomic.AddUint64(&s.queriesBlocking, 1) - // atomic dec when we return from blockingQuery() + count := atomic.AddUint64(&s.queriesBlocking, 1) + metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(count)) + // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) - // set the gauge directly to the new value of s.blockingQueries - metrics.SetGauge([]string{"rpc", "queries_blocking"}, float32(queriesBlocking)) for { if queryOpts.GetRequireConsistent() { @@ -977,24 +974,17 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s return nil } - // block up to the timeout if we don't see anything fresh. - if err := ws.WatchCtx(ctx); err == nil { - // a non-nil error only occurs when the context is cancelled - - // If a restore may have woken us up then bail out from - // the query immediately. This is slightly race-ey since - // this might have been interrupted for other reasons, - // but it's OK to kick it back to the caller in either - // case. - select { - case <-state.AbandonCh(): - return nil - default: - } + // block until something changes, or the timeout + if err := ws.WatchCtx(ctx); err != nil { + // exit if we've reached the timeout, or other cancellation + return nil } - if ctx.Err() != nil { + // exit if the state store has been abandoned + select { + case <-state.AbandonCh(): return nil + default: } } }