From 897b953f6621f6d120f2f4cf875704f71b846bf5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 12 Jan 2022 17:28:06 -0500 Subject: [PATCH 1/3] Add a test for blocking query on non-existent entry This test shows how blocking queries are not efficient when the query returns no results. The test fails with 100+ calls instead of the expected 2. This test is still a bit flaky because it depends on the timing of the writes. It can sometimes return 3 calls. A future commit should fix this and make blocking queries even more optimal for not-found results. --- agent/consul/config_endpoint_test.go | 63 ++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index fce29efe3f..169d864795 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "os" "sort" @@ -9,6 +10,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -302,6 +304,67 @@ func TestConfigEntry_Get(t *testing.T) { require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) } +func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + codec := rpcClient(t, s1) + store := s1.fsm.State() + + entry := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "alpha", + } + require.NoError(t, store.EnsureConfigEntry(1, entry)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "does-not-exist", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.ConfigEntryResponse + + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out.Entry) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := uint64(0); i < 200; i++ { + time.Sleep(5 * time.Millisecond) + entry := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf("other%d", i), + } + if err := store.EnsureConfigEntry(i+2, entry); err != nil { + return err + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + require.Equal(t, 2, count) +} + func TestConfigEntry_Get_ACLDeny(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") From 4b33bdf3966f1228b8b53441269c96a19e6b1d12 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 12 Jan 2022 19:44:09 -0500 Subject: [PATCH 2/3] Make blockingQuery efficient with 'not found' results. By using the query results as state. Blocking queries are efficient when the query matches some results, because the ModifyIndex of those results, returned as queryMeta.Mindex, will never change unless the items themselves change. Blocking queries for non-existent items are not efficient because the queryMeta.Index can (and often does) change when other entities are written. This commit reduces the churn of these queries by using a different comparison for "has changed". Instead of using the modified index, we use the existence of the results. If the previous result was "not found" and the new result is still "not found", we know we can ignore the modified index and continue to block. This is done by setting the minQueryIndex to the returned queryMeta.Index, which prevents the query from returning before a state change is observed. --- .changelog/12110.txt | 3 + agent/consul/config_endpoint.go | 2 +- agent/consul/config_endpoint_test.go | 6 +- agent/consul/rpc.go | 17 ++++- agent/consul/rpc_test.go | 93 ++++++++++++++++++++++++++-- 5 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 .changelog/12110.txt diff --git a/.changelog/12110.txt b/.changelog/12110.txt new file mode 100644 index 0000000000..cca31962d7 --- /dev/null +++ b/.changelog/12110.txt @@ -0,0 +1,3 @@ +```release-note:improvement +rpc: improve blocking queries for items that do not exist, by continuing to block until they exist (or the timeout). +``` diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 44b2119629..419e0afefe 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -207,7 +207,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE reply.Index = index if entry == nil { - return nil + return errNotFound } reply.Entry = entry diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 169d864795..527ba02721 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -362,7 +362,11 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { }) require.NoError(t, g.Wait()) - require.Equal(t, 2, count) + // The test is a bit racy because of the timing of the two goroutines, so + // we relax the check for the count to be within a small range. + if count < 2 || count > 3 { + t.Fatalf("expected count to be 2 or 3, got %d", count) + } } func TestConfigEntry_Get_ACLDeny(t *testing.T) { diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 2bec441e1c..2c15afe5da 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -983,6 +983,9 @@ func (s *Server) blockingQuery( var ws memdb.WatchSet err := query(ws, s.fsm.State()) s.setQueryMeta(responseMeta, opts.GetToken()) + if errors.Is(err, errNotFound) { + return nil + } return err } @@ -995,6 +998,8 @@ func (s *Server) blockingQuery( // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) + var notFound bool + for { if opts.GetRequireConsistent() { if err := s.consistentRead(); err != nil { @@ -1014,7 +1019,15 @@ func (s *Server) blockingQuery( err := query(ws, state) s.setQueryMeta(responseMeta, opts.GetToken()) - if err != nil { + switch { + case errors.Is(err, errNotFound): + if notFound { + // query result has not changed + minQueryIndex = responseMeta.GetIndex() + } + + notFound = true + case err != nil: return err } @@ -1037,6 +1050,8 @@ func (s *Server) blockingQuery( } } +var errNotFound = fmt.Errorf("no data found for query") + // setQueryMeta is used to populate the QueryMeta data for an RPC call // // Note: This method must be called *after* filtering query results with ACLs. diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index cc3673fff6..df08627beb 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -227,11 +227,9 @@ func (m *MockSink) Close() error { return nil } -func TestRPC_blockingQuery(t *testing.T) { +func TestServer_blockingQuery(t *testing.T) { t.Parallel() - dir, s := testServer(t) - defer os.RemoveAll(dir) - defer s.Shutdown() + _, s := testServerWithConfig(t) // Perform a non-blocking query. Note that it's significant that the meta has // a zero index in response - the implied opts.MinQueryIndex is also zero but @@ -391,6 +389,93 @@ func TestRPC_blockingQuery(t *testing.T) { require.NoError(t, err) require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls") }) + + t.Run("non-blocking query for item that does not exist", func(t *testing.T) { + opts := structs.QueryOptions{} + meta := structs.QueryMeta{} + calls := 0 + fn := func(_ memdb.WatchSet, _ *state.Store) error { + calls++ + return errNotFound + } + + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("blocking query for item that does not exist", func(t *testing.T) { + opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} + meta := structs.QueryMeta{} + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + meta.Index = 3 + + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return errNotFound + } + meta.Index = 5 + return errNotFound + } + + err := s.blockingQuery(&opts, &meta, fn) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + t.Run("blocking query for item that existed and is removed", func(t *testing.T) { + opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} + meta := structs.QueryMeta{} + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + meta.Index = 3 + + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return nil + } + meta.Index = 5 + return errNotFound + } + + start := time.Now() + err := s.blockingQuery(&opts, &meta, fn) + require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out") + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + t.Run("blocking query for non-existent item that is created", func(t *testing.T) { + opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} + meta := structs.QueryMeta{} + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + meta.Index = 3 + + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return errNotFound + } + meta.Index = 5 + return nil + } + + start := time.Now() + err := s.blockingQuery(&opts, &meta, fn) + require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out") + require.NoError(t, err) + require.Equal(t, 2, calls) + }) } func TestRPC_ReadyForConsistentReads(t *testing.T) { From 8a6e75ac815a5bdfd9fdbd3528c985d5fdde9c1b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 25 Jan 2022 18:41:15 -0500 Subject: [PATCH 3/3] rpc: add errNotFound to all Get queries Any query that returns a list of items is not part of this commit. --- agent/consul/acl_endpoint.go | 21 ++++++++++++++++----- agent/consul/coordinate_endpoint.go | 1 + agent/consul/federation_state_endpoint.go | 2 +- agent/consul/kvs_endpoint.go | 15 +++++---------- agent/consul/session_endpoint.go | 1 + 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index 13799c3182..d19836466f 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -322,6 +322,9 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke reply.Index, reply.Token = index, token reply.SourceDatacenter = args.Datacenter + if token == nil { + return errNotFound + } return nil }) } @@ -1045,6 +1048,9 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo } reply.Index, reply.Policy = index, policy + if policy == nil { + return errNotFound + } return nil }) } @@ -1428,6 +1434,9 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe } reply.Index, reply.Role = index, role + if role == nil { + return errNotFound + } return nil }) } @@ -1795,12 +1804,14 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { index, rule, err := state.ACLBindingRuleGetByID(ws, args.BindingRuleID, &args.EnterpriseMeta) - if err != nil { return err } reply.Index, reply.BindingRule = index, rule + if rule == nil { + return errNotFound + } return nil }) } @@ -2052,16 +2063,16 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { index, method, err := state.ACLAuthMethodGetByName(ws, args.AuthMethodName, &args.EnterpriseMeta) - if err != nil { return err } - if method != nil { - _ = a.enterpriseAuthMethodTypeValidation(method.Type) + reply.Index, reply.AuthMethod = index, method + if method == nil { + return errNotFound } - reply.Index, reply.AuthMethod = index, method + _ = a.enterpriseAuthMethodTypeValidation(method.Type) return nil }) } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index b35d8b2609..26aaa8536f 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -267,6 +267,7 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde }) } reply.Index, reply.Coordinates = index, coords + return nil }) } diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index da1171a8b7..ac02ee469a 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -124,7 +124,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs reply.Index = index if fedState == nil { - return nil + return errNotFound } reply.State = fedState diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index 2023ea1ea7..9befae5d6a 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -160,18 +160,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er } if ent == nil { - // Must provide non-zero index to prevent blocking - // Index 1 is impossible anyways (due to Raft internals) - if index == 0 { - reply.Index = 1 - } else { - reply.Index = index - } + reply.Index = index reply.Entries = nil - } else { - reply.Index = ent.ModifyIndex - reply.Entries = structs.DirEntries{ent} + return errNotFound } + + reply.Index = ent.ModifyIndex + reply.Entries = structs.DirEntries{ent} return nil }) } diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index ae39a6fc54..bb65938fef 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -198,6 +198,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, reply.Sessions = structs.Sessions{session} } else { reply.Sessions = nil + return errNotFound } s.srv.filterACLWithAuthorizer(authz, reply) return nil