Merge pull request #10514 from hashicorp/dnephin/actually-enable-streaming

streaming: fix not being able to enable streaming
This commit is contained in:
Daniel Nephin 2021-06-28 18:52:03 -04:00 committed by GitHub
commit 1041ba71de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 93 additions and 119 deletions

3
.changelog/10514.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: fix a bug that was preventing streaming from being enabled.
```

View File

@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) {
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
UseStreamingBackend: a.config.UseStreamingBackend,
}
a.serviceManager = NewServiceManager(&a)

View File

@ -18,7 +18,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -30,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt"
@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
}
}
func TestCacheRateLimit(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
tests := []struct {
// count := number of updates performed (1 every 10ms)
count int
// rateLimit rate limiting of cache
rateLimit float64
// Minimum number of updates to see from a cache perspective
// We add a value with tolerance to work even on a loaded CI
minUpdates int
}{
// 250 => we have a test running for at least 2.5s
{250, 0.5, 1},
{250, 1, 1},
{300, 2, 2},
}
for _, currentTest := range tests {
t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
tt := currentTest
t.Parallel()
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
cfg := a.config
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
cfg.Cache.EntryFetchMaxBurst = 1
a.reloadConfigInternal(cfg)
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
var wg sync.WaitGroup
stillProcessing := true
injectService := func(i int) {
srv := &structs.NodeService{
Service: "redis",
ID: "redis",
Port: 1024 + i,
Address: fmt.Sprintf("10.0.1.%d", i%255),
}
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
}
runUpdates := func() {
wg.Add(tt.count)
for i := 0; i < tt.count; i++ {
time.Sleep(10 * time.Millisecond)
injectService(i)
wg.Done()
}
stillProcessing = false
}
getIndex := func(t *testing.T, oldIndex int) int {
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
require.NoError(t, err)
resp := httptest.NewRecorder()
a.srv.handler(false).ServeHTTP(resp, req)
// Key doesn't actually exist so we should get 404
if got, want := resp.Code, http.StatusOK; got != want {
t.Fatalf("bad response code got %d want %d", got, want)
}
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
require.NoError(t, err)
return index
}
{
start := time.Now()
injectService(0)
// Get the first index
index := getIndex(t, 0)
require.Greater(t, index, 2)
go runUpdates()
numberOfUpdates := 0
for stillProcessing {
oldIndex := index
index = getIndex(t, oldIndex)
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
numberOfUpdates++
}
elapsed := time.Since(start)
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)
// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
// We must have at least being notified a few times
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
}
wg.Wait()
})
}
}
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -742,8 +742,13 @@ func TestHealthServiceNodes_Blocking(t *testing.T) {
name string
hcl string
grpcMetrics bool
queryBackend string
}{
{name: "no streaming"},
{
name: "no streaming",
queryBackend: "blocking-query",
hcl: `use_streaming_backend = false`,
},
{
name: "streaming",
grpcMetrics: true,
@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) {
rpc { enable_streaming = true }
use_streaming_backend = true
`,
queryBackend: "streaming",
},
}
@ -856,6 +862,8 @@ use_streaming_backend = true
require.True(t, idx < newIdx, "index should have increased."+
"idx=%d, newIdx=%d", idx, newIdx)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
idx = newIdx
checkErrs()
@ -882,6 +890,7 @@ use_streaming_backend = true
newIdx := getIndex(t, resp)
require.Equal(t, idx, newIdx)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}
if tc.grpcMetrics {
@ -907,14 +916,23 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
tests := []struct {
name string
config string
queryBackend string
}{
{"normal", ""},
{"cache-with-streaming", `
{
name: "blocking-query",
config: `use_streaming_backend=false`,
queryBackend: "blocking-query",
},
{
name: "cache-with-streaming",
config: `
rpc{
enable_streaming=true
}
use_streaming_backend=true
`},
`,
queryBackend: "streaming",
},
}
for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) {
@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}
require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
}
}
@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
}))
require.True(t, t.Run("test caching hit", func(t *testing.T) {
@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
}))
}

View File

@ -723,6 +723,13 @@ func setMeta(resp http.ResponseWriter, m structs.QueryMetaCompat) {
setLastContact(resp, m.GetLastContact())
setKnownLeader(resp, m.GetKnownLeader())
setConsistency(resp, m.GetConsistencyLevel())
setQueryBackend(resp, m.GetBackend())
}
func setQueryBackend(resp http.ResponseWriter, backend structs.QueryBackend) {
if b := backend.String(); b != "" {
resp.Header().Set("X-Consul-Query-Backend", b)
}
}
// setCacheMeta sets http response headers to indicate cache status.

View File

@ -42,7 +42,8 @@ func (c *Client) ServiceNodes(
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err
}
out, md, err := c.getServiceNodes(ctx, req)

View File

@ -172,6 +172,7 @@ func (s *healthView) Result(index uint64) interface{} {
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
for _, node := range s.state {

View File

@ -102,6 +102,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
Backend: structs.QueryBackendStreaming,
},
}
@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{}
result.QueryMeta.Backend = structs.QueryBackendStreaming
for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node},

View File

@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) {
assertIndex(t, resp)
require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend"))
}
func TestGRPCWithTLSConfigs(t *testing.T) {

View File

@ -44,6 +44,7 @@ type QueryMetaCompat interface {
SetIndex(uint64)
GetConsistencyLevel() string
SetConsistencyLevel(string)
GetBackend() QueryBackend
}
// GetToken helps implement the QueryOptionsCompat interface
@ -269,3 +270,7 @@ func (q *QueryMeta) SetIndex(index uint64) {
func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
q.ConsistencyLevel = consistencyLevel
}
func (q *QueryMeta) GetBackend() QueryBackend {
return q.Backend
}

View File

@ -339,6 +339,24 @@ func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime,
return time.Since(start) > rpcHoldTimeout
}
type QueryBackend int
const (
QueryBackendBlocking QueryBackend = iota
QueryBackendStreaming
)
func (q QueryBackend) String() string {
switch q {
case QueryBackendBlocking:
return "blocking-query"
case QueryBackendStreaming:
return "streaming"
default:
return ""
}
}
// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
@ -363,6 +381,9 @@ type QueryMeta struct {
// When NotModified is true, the response will not contain the result of
// the query.
NotModified bool
// Backend used to handle this query, either blocking-query or streaming.
Backend QueryBackend
}
// RegisterRequest is used for the Catalog.Register endpoint

View File

@ -215,9 +215,13 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{})
}
// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that getFromView had to wait for an update,
Cached bool
}
// getFromView blocks until the index of the View is greater than opts.MinIndex,
@ -237,6 +241,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}

View File

@ -171,7 +171,7 @@ func (s *Store) Notify(
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
}
select {
case updateCh <- u:

View File

@ -2,6 +2,8 @@ package pbcommon
import (
"time"
"github.com/hashicorp/consul/agent/structs"
)
// IsRead is always true for QueryOption
@ -97,6 +99,10 @@ func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
q.ConsistencyLevel = consistencyLevel
}
func (q *QueryMeta) GetBackend() structs.QueryBackend {
return structs.QueryBackend(0)
}
// WriteRequest only applies to writes, always false
func (w WriteRequest) IsRead() bool {
return false

View File

@ -99,6 +99,9 @@ While streaming is a significant optimization over long polling, it will not pop
`X-Consul-LastContact` or `X-Consul-KnownLeader` response headers, because the required
data is not available to the client.
When the streaming backend is used, API responses will include the `X-Consul-Query-Backend`
header with a value of `streaming`.
## Hash-based Blocking Queries