rpc: move the index defaulting to setQueryMeta.

This safeguard should be safe to apply in general. We are already
applying it to non-blocking queries that call blockingQuery, so it
should be fine to apply it to others.
This commit is contained in:
Daniel Nephin 2022-01-17 15:09:08 -05:00
parent 4b67d6c18b
commit fd0a9fd4f3
4 changed files with 19 additions and 15 deletions

View File

@ -984,18 +984,6 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
return err return err
} }
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
// blocking was requested by client, NOT meta.Index since the state function
// might return zero if something is not initialized and care wasn't taken to
// handle that special case (in practice this happened a lot so fixing it
// systematically here beats trying to remember to add zero checks in every
// state method). We also need to ensure that unless there is an error, we
// return an index > 0 otherwise the client will never block and burn CPU and
// requests.
if queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
if queryMeta.GetIndex() > minQueryIndex { if queryMeta.GetIndex() > minQueryIndex {
return nil return nil
} }
@ -1034,6 +1022,17 @@ func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
m.SetKnownLeader(s.raft.Leader() != "") m.SetKnownLeader(s.raft.Leader() != "")
} }
maskResultsFilteredByACLs(token, m) maskResultsFilteredByACLs(token, m)
// Always set a non-zero QueryMeta.Index. Generally we expect the
// QueryMeta.Index to be set to structs.RaftIndex.ModifyIndex. If the query
// returned no results we expect it to be set to the max index of the table,
// however we can't guarantee this always happens.
// To prevent a client from accidentally performing many non-blocking queries
// (which causes lots of unnecessary load), we always set a default value of 1.
// This is sufficient to prevent the unnecessary load in most cases.
if m.GetIndex() < 1 {
m.SetIndex(1)
}
} }
// consistentRead is used to ensure we do not perform a stale // consistentRead is used to ensure we do not perform a stale

View File

@ -821,6 +821,7 @@ func TestTxn_Read(t *testing.T) {
}, },
QueryMeta: structs.QueryMeta{ QueryMeta: structs.QueryMeta{
KnownLeader: true, KnownLeader: true,
Index: 1,
}, },
} }
require.Equal(expected, out) require.Equal(expected, out)

View File

@ -370,7 +370,9 @@ func (q QueryBackend) String() string {
// QueryMeta allows a query response to include potentially // QueryMeta allows a query response to include potentially
// useful metadata about a query // useful metadata about a query
type QueryMeta struct { type QueryMeta struct {
// Index in the raft log of the latest item returned by the query. // Index in the raft log of the latest item returned by the query. If the
// query did not return any results the Index will be a value that will
// change when a new item is added.
Index uint64 Index uint64
// If AllowStale is used, this is time elapsed since // If AllowStale is used, this is time elapsed since

View File

@ -10,11 +10,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
) )
func TestTxnEndpoint_Bad_JSON(t *testing.T) { func TestTxnEndpoint_Bad_JSON(t *testing.T) {
@ -385,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
}, },
QueryMeta: structs.QueryMeta{ QueryMeta: structs.QueryMeta{
KnownLeader: true, KnownLeader: true,
Index: 1,
}, },
} }
assert.Equal(t, expected, txnResp) assert.Equal(t, expected, txnResp)