From 16b21b08645bc2f32f0b1e61ba437e660be1173b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Jun 2021 16:25:49 -0400 Subject: [PATCH 1/4] http: add an X-Consul-Query-Backend header to responses So that it is easier to detect and test when streaming is being used. --- agent/http.go | 7 +++++++ agent/structs/protobuf_compat.go | 5 +++++ agent/structs/structs.go | 21 +++++++++++++++++++ proto/pbcommon/common.go | 6 ++++++ .../content/api-docs/features/blocking.mdx | 3 +++ 5 files changed, 42 insertions(+) diff --git a/agent/http.go b/agent/http.go index f6a8b448e4..ad2bbe9779 100644 --- a/agent/http.go +++ b/agent/http.go @@ -723,6 +723,13 @@ func setMeta(resp http.ResponseWriter, m structs.QueryMetaCompat) { setLastContact(resp, m.GetLastContact()) setKnownLeader(resp, m.GetKnownLeader()) setConsistency(resp, m.GetConsistencyLevel()) + setQueryBackend(resp, m.GetBackend()) +} + +func setQueryBackend(resp http.ResponseWriter, backend structs.QueryBackend) { + if b := backend.String(); b != "" { + resp.Header().Set("X-Consul-Query-Backend", b) + } } // setCacheMeta sets http response headers to indicate cache status. diff --git a/agent/structs/protobuf_compat.go b/agent/structs/protobuf_compat.go index 5322288264..667443c9e9 100644 --- a/agent/structs/protobuf_compat.go +++ b/agent/structs/protobuf_compat.go @@ -44,6 +44,7 @@ type QueryMetaCompat interface { SetIndex(uint64) GetConsistencyLevel() string SetConsistencyLevel(string) + GetBackend() QueryBackend } // GetToken helps implement the QueryOptionsCompat interface @@ -269,3 +270,7 @@ func (q *QueryMeta) SetIndex(index uint64) { func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) { q.ConsistencyLevel = consistencyLevel } + +func (q *QueryMeta) GetBackend() QueryBackend { + return q.Backend +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 93c816e4b6..a0d3e10338 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -339,6 +339,24 @@ func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, return time.Since(start) > rpcHoldTimeout } +type QueryBackend int + +const ( + QueryBackendBlocking QueryBackend = iota + QueryBackendStreaming +) + +func (q QueryBackend) String() string { + switch q { + case QueryBackendBlocking: + return "blocking-query" + case QueryBackendStreaming: + return "streaming" + default: + return "" + } +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { @@ -363,6 +381,9 @@ type QueryMeta struct { // When NotModified is true, the response will not contain the result of // the query. NotModified bool + + // Backend used to handle this query, either blocking-query or streaming. + Backend QueryBackend } // RegisterRequest is used for the Catalog.Register endpoint diff --git a/proto/pbcommon/common.go b/proto/pbcommon/common.go index e6e981ccfa..eb396ae58e 100644 --- a/proto/pbcommon/common.go +++ b/proto/pbcommon/common.go @@ -2,6 +2,8 @@ package pbcommon import ( "time" + + "github.com/hashicorp/consul/agent/structs" ) // IsRead is always true for QueryOption @@ -97,6 +99,10 @@ func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) { q.ConsistencyLevel = consistencyLevel } +func (q *QueryMeta) GetBackend() structs.QueryBackend { + return structs.QueryBackend(0) +} + // WriteRequest only applies to writes, always false func (w WriteRequest) IsRead() bool { return false diff --git a/website/content/api-docs/features/blocking.mdx b/website/content/api-docs/features/blocking.mdx index 2b7a9ae860..4087c64d55 100644 --- a/website/content/api-docs/features/blocking.mdx +++ b/website/content/api-docs/features/blocking.mdx @@ -99,6 +99,9 @@ While streaming is a significant optimization over long polling, it will not pop `X-Consul-LastContact` or `X-Consul-KnownLeader` response headers, because the required data is not available to the client. +When the streaming backend is used, API responses will include the `X-Consul-Query-Backend` +header with a value of `streaming`. + ## Hash-based Blocking Queries From 8b365f827163971e087d6754e32ad1a7b8485cb2 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 10 Jun 2021 11:56:02 -0400 Subject: [PATCH 2/4] Remove a racy and failing test This test is super racy (it's not just a single line). This test also starts failing once streaming is enabled, because the cache rate limit no longer applies to the requests in the test. The queries use streaming instead of the cache. This test is no longer valid, and the functionality is already well tested by TestCacheThrottle. Instead of spending time rewriting this test, let's remove it. ``` WARNING: DATA RACE Read at 0x00c01de410fc by goroutine 735: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:1024 +0x9af github.com/hashicorp/consul/testrpc.WaitForTestAgent() /home/daniel/pers/code/consul/testrpc/wait.go:99 +0x209 github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:966 +0x1ad testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 Previous write at 0x00c01de410fc by goroutine 605: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1.2() /home/daniel/pers/code/consul/agent/agent_test.go:998 +0xe9 Goroutine 735 (running) created at: testing.(*T).Run() /usr/lib/go/src/testing/testing.go:1238 +0x5d7 github.com/hashicorp/consul/agent.TestCacheRateLimit() /home/daniel/pers/code/consul/agent/agent_test.go:961 +0x375 testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 Goroutine 605 (finished) created at: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:1022 +0x91e github.com/hashicorp/consul/testrpc.WaitForTestAgent() /home/daniel/pers/code/consul/testrpc/wait.go:99 +0x209 github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:966 +0x1ad testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 ``` --- agent/agent_test.go | 106 -------------------------------------------- 1 file changed, 106 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 584ea1eaae..12e282b7e1 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -18,7 +18,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "testing" "time" @@ -30,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc" "gopkg.in/square/go-jose.v2/jwt" @@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) { } } -func TestCacheRateLimit(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - tests := []struct { - // count := number of updates performed (1 every 10ms) - count int - // rateLimit rate limiting of cache - rateLimit float64 - // Minimum number of updates to see from a cache perspective - // We add a value with tolerance to work even on a loaded CI - minUpdates int - }{ - // 250 => we have a test running for at least 2.5s - {250, 0.5, 1}, - {250, 1, 1}, - {300, 2, 2}, - } - for _, currentTest := range tests { - t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) { - tt := currentTest - t.Parallel() - a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }") - defer a.Shutdown() - testrpc.WaitForTestAgent(t, a.RPC, "dc1") - - cfg := a.config - require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate) - require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst) - cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit) - cfg.Cache.EntryFetchMaxBurst = 1 - a.reloadConfigInternal(cfg) - require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate) - require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst) - var wg sync.WaitGroup - stillProcessing := true - - injectService := func(i int) { - srv := &structs.NodeService{ - Service: "redis", - ID: "redis", - Port: 1024 + i, - Address: fmt.Sprintf("10.0.1.%d", i%255), - } - - err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) - require.Nil(t, err) - } - - runUpdates := func() { - wg.Add(tt.count) - for i := 0; i < tt.count; i++ { - time.Sleep(10 * time.Millisecond) - injectService(i) - wg.Done() - } - stillProcessing = false - } - - getIndex := func(t *testing.T, oldIndex int) int { - req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil) - require.NoError(t, err) - - resp := httptest.NewRecorder() - a.srv.handler(false).ServeHTTP(resp, req) - // Key doesn't actually exist so we should get 404 - if got, want := resp.Code, http.StatusOK; got != want { - t.Fatalf("bad response code got %d want %d", got, want) - } - index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index")) - require.NoError(t, err) - return index - } - - { - start := time.Now() - injectService(0) - // Get the first index - index := getIndex(t, 0) - require.Greater(t, index, 2) - go runUpdates() - numberOfUpdates := 0 - for stillProcessing { - oldIndex := index - index = getIndex(t, oldIndex) - require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only") - numberOfUpdates++ - } - elapsed := time.Since(start) - qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed) - summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit) - - // We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock - require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary)) - // We must have at least being notified a few times - require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary)) - } - wg.Wait() - }) - } -} - func TestAddServiceIPv4TaggedDefault(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") From c78391797d6123baad57314bd1d1b323aa735b92 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Jun 2021 16:48:10 -0400 Subject: [PATCH 3/4] streaming: fix enable of streaming in the client And add checks to all the tests that explicitly use streaming. --- .changelog/10514.txt | 3 +++ agent/agent.go | 1 + agent/health_endpoint_test.go | 42 ++++++++++++++++++++++------- agent/rpcclient/health/view.go | 3 ++- agent/rpcclient/health/view_test.go | 4 ++- agent/streaming_test.go | 1 + 6 files changed, 43 insertions(+), 11 deletions(-) create mode 100644 .changelog/10514.txt diff --git a/.changelog/10514.txt b/.changelog/10514.txt new file mode 100644 index 0000000000..f427fdcc26 --- /dev/null +++ b/.changelog/10514.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: fix a bug that was preventing streaming from being enabled. +``` diff --git a/agent/agent.go b/agent/agent.go index 14b474ef08..73d22894c8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) { Conn: conn, Logger: bd.Logger.Named("rpcclient.health"), }, + UseStreamingBackend: a.config.UseStreamingBackend, } a.serviceManager = NewServiceManager(&a) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index b36c23fa43..075849eb80 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -739,11 +739,16 @@ func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes_Blocking(t *testing.T) { cases := []struct { - name string - hcl string - grpcMetrics bool + name string + hcl string + grpcMetrics bool + queryBackend string }{ - {name: "no streaming"}, + { + name: "no streaming", + queryBackend: "blocking-query", + hcl: `use_streaming_backend = false`, + }, { name: "streaming", grpcMetrics: true, @@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) { rpc { enable_streaming = true } use_streaming_backend = true `, + queryBackend: "streaming", }, } @@ -856,6 +862,8 @@ use_streaming_backend = true require.True(t, idx < newIdx, "index should have increased."+ "idx=%d, newIdx=%d", idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + idx = newIdx checkErrs() @@ -882,6 +890,7 @@ use_streaming_backend = true newIdx := getIndex(t, resp) require.Equal(t, idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) } if tc.grpcMetrics { @@ -905,16 +914,25 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { t.Parallel() tests := []struct { - name string - config string + name string + config string + queryBackend string }{ - {"normal", ""}, - {"cache-with-streaming", ` + { + name: "blocking-query", + config: `use_streaming_backend=false`, + queryBackend: "blocking-query", + }, + { + name: "cache-with-streaming", + config: ` rpc{ enable_streaming=true } use_streaming_backend=true - `}, + `, + queryBackend: "streaming", + }, } for _, tst := range tests { t.Run(tst.name, func(t *testing.T) { @@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { t.Fatalf("bad: %v", obj) } + + require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) }) } } @@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache miss require.Equal(t, "MISS", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) require.True(t, t.Run("test caching hit", func(t *testing.T) { @@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache HIT now! require.Equal(t, "HIT", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) } diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index a648686c45..2f31dde210 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -171,7 +171,8 @@ func (s *healthView) Result(index uint64) interface{} { result := structs.IndexedCheckServiceNodes{ Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), QueryMeta: structs.QueryMeta{ - Index: index, + Index: index, + Backend: structs.QueryBackendStreaming, }, } for _, node := range s.state { diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index bdc59ad520..beba03d880 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -101,7 +101,8 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { empty := &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{}, QueryMeta: structs.QueryMeta{ - Index: 1, + Index: 1, + Backend: structs.QueryBackendStreaming, }, } @@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { result := &structs.IndexedCheckServiceNodes{} + result.QueryMeta.Backend = structs.QueryBackendStreaming for _, node := range nodes { result.Nodes = append(result.Nodes, structs.CheckServiceNode{ Node: &structs.Node{Node: node}, diff --git a/agent/streaming_test.go b/agent/streaming_test.go index 0f45ad9ed4..5fa4dd4c0d 100644 --- a/agent/streaming_test.go +++ b/agent/streaming_test.go @@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) { assertIndex(t, resp) require.NotEmpty(t, resp.Header().Get("X-Consul-Index")) + require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend")) } func TestGRPCWithTLSConfigs(t *testing.T) { From bc4d349ccf62bee5c3acdf294ce34868aa6c35bd Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Jun 2021 17:29:23 -0400 Subject: [PATCH 4/4] streaming: support X-Cache-Hit header If a value was already available in the local view the request is considered a cache hit. If the materialized had to wait for a value, it is considered a cache miss. --- agent/rpcclient/health/health.go | 3 ++- agent/submatview/materializer.go | 5 +++++ agent/submatview/store.go | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 4cd7b0f4d1..9d20f3caa8 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -42,7 +42,8 @@ func (c *Client) ServiceNodes( if err != nil { return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err } - return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err + meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached} + return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err } out, md, err := c.getServiceNodes(ctx, req) diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 51402987dc..b830689e69 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -215,9 +215,13 @@ func (m *Materializer) notifyUpdateLocked(err error) { m.updateCh = make(chan struct{}) } +// Result returned from the View. type Result struct { Index uint64 Value interface{} + // Cached is true if the requested value was already available locally. If + // the value is false, it indicates that getFromView had to wait for an update, + Cached bool } // getFromView blocks until the index of the View is greater than opts.MinIndex, @@ -237,6 +241,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result // haven't loaded a snapshot at all yet which means we should wait for one on // the update chan. if result.Index > 0 && result.Index > minIndex { + result.Cached = true return result, nil } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 58acf5db33..07363f7403 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -171,7 +171,7 @@ func (s *Store) Notify( u := cache.UpdateEvent{ CorrelationID: correlationID, Result: result.Value, - Meta: cache.ResultMeta{Index: result.Index}, + Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, } select { case updateCh <- u: