From d9537411b8c12633938fcadc7b5359f07b5c1024 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 26 Apr 2021 12:20:33 -0400 Subject: [PATCH] rpcclient/health: convert tests to the new submatview.Store interface Also fixes a minor data race in Materializer. Capture the error before releasing the lock. --- agent/rpcclient/health/streaming_test.go | 69 ++ agent/rpcclient/health/view_test.go | 803 +++++++++++++---------- agent/submatview/materializer.go | 3 +- agent/submatview/store.go | 10 +- 4 files changed, 527 insertions(+), 358 deletions(-) create mode 100644 agent/rpcclient/health/streaming_test.go diff --git a/agent/rpcclient/health/streaming_test.go b/agent/rpcclient/health/streaming_test.go new file mode 100644 index 0000000000..a55fac25b4 --- /dev/null +++ b/agent/rpcclient/health/streaming_test.go @@ -0,0 +1,69 @@ +package health + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +// streamClient is a mock StreamingClient for testing that allows +// for queueing up custom events to a subscriber. +type streamClient struct { + pbsubscribe.StateChangeSubscription_SubscribeClient + subFn func(*pbsubscribe.SubscribeRequest) error + events chan eventOrErr + ctx context.Context +} + +type eventOrErr struct { + Err error + Event *pbsubscribe.Event +} + +func newStreamClient(sub func(req *pbsubscribe.SubscribeRequest) error) *streamClient { + if sub == nil { + sub = func(*pbsubscribe.SubscribeRequest) error { + return nil + } + } + return &streamClient{ + events: make(chan eventOrErr, 32), + subFn: sub, + } +} + +func (t *streamClient) Subscribe( + ctx context.Context, + req *pbsubscribe.SubscribeRequest, + _ ...grpc.CallOption, +) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { + if err := t.subFn(req); err != nil { + return nil, err + } + t.ctx = ctx + return t, nil +} + +func (t *streamClient) QueueEvents(events ...*pbsubscribe.Event) { + for _, e := range events { + t.events <- eventOrErr{Event: e} + } +} + +func (t *streamClient) QueueErr(err error) { + t.events <- eventOrErr{Err: err} +} + +func (t *streamClient) Recv() (*pbsubscribe.Event, error) { + select { + case eoe := <-t.events: + if eoe.Err != nil { + return nil, eoe.Err + } + return eoe.Event, nil + case <-t.ctx.Done(): + return nil, t.ctx.Err() + } +} diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index deb0081309..bdc59ad520 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -1,248 +1,29 @@ package health import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/hashicorp/consul/agent/structs" -) - -/* - -import ( + "context" "errors" "fmt" "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" ) -func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - namespace := pbcommon.DefaultEnterpriseMeta.Namespace - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} - - // Initially there are no services registered. Server should send an - // EndOfSnapshot message immediately with index of 1. - client.QueueEvents(newEndOfSnapshotEvent(1)) - - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - empty := &structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{}, - QueryMeta: structs.QueryMeta{ - Index: 1, - }, - } - - runStep(t, "empty snapshot returned", func(t *testing.T) { - // Fetch should return an empty - // result of the right type with a non-zero index, and in the background begin - // streaming updates. - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - - require.Equal(t, uint64(1), result.Index) - require.Equal(t, empty, result.Value) - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "blocks for timeout", func(t *testing.T) { - // Subsequent fetch should block for the timeout - start := time.Now() - opts.Timeout = 200 * time.Millisecond - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - require.Equal(t, empty, result.Value, "result value should not have changed") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "blocks until update", func(t *testing.T) { - // Make another blocking query with a longer timeout and trigger an update - // event part way through. - start := time.Now() - go func() { - time.Sleep(200 * time.Millisecond) - client.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) - }() - - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until the event was delivered") - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, uint64(4), result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { - client.QueueErr(tempError("broken pipe")) - - // Next fetch will continue to block until timeout and receive the same - // result. - start := time.Now() - opts.Timeout = 200 * time.Millisecond - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed") - - opts.MinIndex = result.Index - opts.LastResult = &result - - // But an update should still be noticed due to reconnection - client.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) - - start = time.Now() - opts.Timeout = time.Second - result, err = typ.Fetch(opts, req) - require.NoError(t, err) - elapsed = time.Since(start) - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, uint64(10), result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { - // Wait and send the error while fetcher is waiting - go func() { - time.Sleep(200 * time.Millisecond) - client.QueueErr(errors.New("invalid request")) - }() - - // Next fetch should return the error - start := time.Now() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) - require.Error(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until error was sent") - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - // We don't require instances to be returned in same order so we use - // elementsMatch which is recursive. - requireResultsSame(t, - opts.LastResult.Value.(*structs.IndexedCheckServiceNodes), - result.Value.(*structs.IndexedCheckServiceNodes), - ) - - opts.MinIndex = result.Index - opts.LastResult = &result - - // But an update should still be noticed due to reconnection - client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web")) - - opts.Timeout = time.Second - result, err = typ.Fetch(opts, req) - require.NoError(t, err) - elapsed = time.Since(start) - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) -} - -type tempError string - -func (e tempError) Error() string { - return string(e) -} - -func (e tempError) Temporary() bool { - return true -} - -// requireResultsSame compares two IndexedCheckServiceNodes without requiring -// the same order of results (which vary due to map usage internally). -func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) { - require.Equal(t, want.Index, got.Index) - - svcIDs := func(csns structs.CheckServiceNodes) []string { - res := make([]string, 0, len(csns)) - for _, csn := range csns { - res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID)) - } - return res - } - - gotIDs := svcIDs(got.Nodes) - wantIDs := svcIDs(want.Nodes) - - require.ElementsMatch(t, wantIDs, gotIDs) -} - -// getNamespace returns a namespace if namespace support exists, otherwise -// returns the empty string. It allows the same tests to work in both oss and ent -// without duplicating the tests. -func getNamespace(ns string) string { - meta := structs.NewEnterpriseMeta(ns) - return meta.NamespaceOrEmpty() -} - -func TestOrderingConsistentWithMemDb(t *testing.T) { +func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) { index := uint64(42) buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { newID, err := uuid.GenerateUUID() @@ -280,29 +61,205 @@ func TestOrderingConsistentWithMemDb(t *testing.T) { two := buildTestNode("node1", "testService:2") three := buildTestNode("node2", "testService") result := structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{ - three, two, zero, one, - }, - QueryMeta: structs.QueryMeta{ - Index: index, - }, + Nodes: structs.CheckServiceNodes{three, two, zero, one}, + QueryMeta: structs.QueryMeta{Index: index}, } sortCheckServiceNodes(&result) expected := structs.CheckServiceNodes{zero, one, two, three} require.Equal(t, expected, result.Nodes) } -func TestStreamingHealthServices_FullSnapshot(t *testing.T) { +func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) + streamClient := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) + go store.Run(ctx) + + // Initially there are no services registered. Server should send an + // EndOfSnapshot message immediately with index of 1. + streamClient.QueueEvents(newEndOfSnapshotEvent(1)) + + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: streamClient, + } + empty := &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{}, + QueryMeta: structs.QueryMeta{ + Index: 1, + }, + } + + runStep(t, "empty snapshot returned", func(t *testing.T) { + result, err := store.Get(ctx, req) + require.NoError(t, err) + + require.Equal(t, uint64(1), result.Index) + require.Equal(t, empty, result.Value) + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "blocks for timeout", func(t *testing.T) { + // Subsequent fetch should block for the timeout + start := time.Now() + req.QueryOptions.MaxQueryTime = 200 * time.Millisecond + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") + require.Equal(t, empty, result.Value, "result value should not have changed") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + var lastResultValue structs.CheckServiceNodes + + runStep(t, "blocks until update", func(t *testing.T) { + // Make another blocking query with a longer timeout and trigger an update + // event part way through. + start := time.Now() + go func() { + time.Sleep(200 * time.Millisecond) + streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) + }() + + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until the event was delivered") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(4), result.Index, "result index should not have changed") + lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes + require.Len(t, lastResultValue, 1, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { + streamClient.QueueErr(tempError("broken pipe")) + + // Next fetch will continue to block until timeout and receive the same + // result. + start := time.Now() + req.QueryOptions.MaxQueryTime = 200 * time.Millisecond + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, + "result index should not have changed") + require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, + "result value should not have changed") + + req.QueryOptions.MinQueryIndex = result.Index + + // But an update should still be noticed due to reconnection + streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) + + start = time.Now() + req.QueryOptions.MaxQueryTime = time.Second + result, err = store.Get(ctx, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(10), result.Index, "result index should not have changed") + lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes + require.Len(t, lastResultValue, 2, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { + // Wait and send the error while fetcher is waiting + go func() { + time.Sleep(200 * time.Millisecond) + streamClient.QueueErr(errors.New("invalid request")) + }() + + // Next fetch should return the error + start := time.Now() + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) + require.Error(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until error was sent") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") + require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes) + + req.QueryOptions.MinQueryIndex = result.Index + + // But an update should still be noticed due to reconnection + streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web")) + + req.QueryOptions.MaxQueryTime = time.Second + result, err = store.Get(ctx, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") + require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) +} + +type tempError string + +func (e tempError) Error() string { + return string(e) +} + +func (e tempError) Temporary() bool { + return true +} + +func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } namespace := getNamespace("ns2") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + client := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) // Create an initial snapshot of 3 instances on different nodes registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { @@ -314,37 +271,28 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(5, 3), newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - // Result will be sorted alphabetically the same way as memdb - return nodes + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: client, } runStep(t, "full snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.ElementsMatch(t, []string{"node1", "node2", "node3"}, - gatherNodes(result.Value)) + expected := newExpectedNodes("node1", "node2", "node3") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "blocks until deregistration", func(t *testing.T) { @@ -358,8 +306,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) }() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed >= 200*time.Millisecond, @@ -368,10 +316,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { "Fetch should have returned before the timeout") require.Equal(t, uint64(20), result.Index) - require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node2", "node3") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "server reload is respected", func(t *testing.T) { @@ -389,18 +338,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { // Make another blocking query with THE SAME index. It should immediately // return the new snapshot. start := time.Now() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node3", "node4", "node5") + expected.Index = 50 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { @@ -414,26 +364,54 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { newEndOfSnapshotEvent(50)) start := time.Now() - opts.MinIndex = 49 - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MinQueryIndex = 49 + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node3", "node4", "node5") + expected.Index = 50 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) } -func TestStreamingHealthServices_EventBatches(t *testing.T) { +func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { + result := &structs.IndexedCheckServiceNodes{} + for _, node := range nodes { + result.Nodes = append(result.Nodes, structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + }) + } + return result +} + +// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode +// by Node name. +var cmpCheckServiceNodeNames = cmp.Options{ + cmp.Comparer(func(x, y structs.CheckServiceNode) bool { + return x.Node.Node == y.Node.Node + }), +} + +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { + t.Helper() + if diff := cmp.Diff(x, y, opts...); diff != "" { + t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) + } +} + +func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { namespace := getNamespace("ns3") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + client := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( @@ -444,36 +422,28 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { batchEv, newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - return nodes + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: client, } runStep(t, "full snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.ElementsMatch(t, []string{"node1", "node2", "node3"}, - gatherNodes(result.Value)) - opts.MinIndex = result.Index - opts.LastResult = &result + expected := newExpectedNodes("node1", "node2", "node3") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "batched updates work too", func(t *testing.T) { @@ -486,99 +456,226 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { newEventServiceHealthRegister(20, 4, "web"), ) client.QueueEvents(batchEv) - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - require.ElementsMatch(t, []string{"node2", "node3", "node4"}, - gatherNodes(result.Value)) + expected := newExpectedNodes("node2", "node3", "node4") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) } -func TestStreamingHealthServices_Filtering(t *testing.T) { +func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { namespace := getNamespace("ns3") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + streamClient := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node2"`, + MaxQueryTime: time.Second, + }, + }, + }, + streamClient: streamClient, + } // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( newEventServiceHealthRegister(5, 1, "web"), newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 3, "web")) - client.QueueEvents( + streamClient.QueueEvents( batchEv, newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - QueryOptions: structs.QueryOptions{ - Filter: `Node.Node == "node2"`, - }, - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - return nodes - } - runStep(t, "filtered snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node2") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "filtered updates work too", func(t *testing.T) { - // Simulate multiple registrations happening in one Txn (so all have same - // index) + // Simulate multiple registrations happening in one Txn (all have same index) batchEv := newEventBatchWithEvents( // Deregister an existing node newEventServiceHealthDeregister(20, 1, "web"), // Register another newEventServiceHealthRegister(20, 4, "web"), ) - client.QueueEvents(batchEv) - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + streamClient.QueueEvents(batchEv) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) - - opts.MinIndex = result.Index - opts.LastResult = &result + expected := newExpectedNodes("node2") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) } +// serviceRequestStub overrides NewMaterializer so that test can use a fake +// StreamClient. +type serviceRequestStub struct { + serviceRequest + streamClient submatview.StreamClient +} + +func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) { + view, err := newHealthView(r.ServiceSpecificRequest) + if err != nil { + return nil, err + } + return submatview.NewMaterializer(submatview.Deps{ + View: view, + Client: r.streamClient, + Logger: hclog.New(nil), + Request: newMaterializerRequest(r.ServiceSpecificRequest), + }), nil +} + +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + } +} + +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: node, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: pbcommon.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + }, + } +} + +func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { + events := make([]*pbsubscribe.Event, len(evs)+1) + events[0] = first + for i := range evs { + events[i+1] = evs[i] + } + return &pbsubscribe.Event{ + Index: first.Index, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{Events: events}, + }, + } +} + +func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + } +} + +func newNewSnapshotToFollowEvent() *pbsubscribe.Event { + return &pbsubscribe.Event{ + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, + } +} + +// getNamespace returns a namespace if namespace support exists, otherwise +// returns the empty string. It allows the same tests to work in both oss and ent +// without duplicating the tests. +func getNamespace(ns string) string { + meta := structs.NewEnterpriseMeta(ns) + return meta.NamespaceOrEmpty() +} + +func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error { + return func(request *pbsubscribe.SubscribeRequest) error { + if request.Namespace != ns { + return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace) + } + return nil + } +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) { t.FailNow() } } -*/ func TestNewFilterEvaluator(t *testing.T) { type testCase struct { diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 94f922c61c..3e3381e74b 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -249,8 +249,9 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result result.Index = m.index if m.err != nil { + err := m.err m.lock.Unlock() - return result, m.err + return result, err } result.Value = m.view.Result(m.index) diff --git a/agent/submatview/store.go b/agent/submatview/store.go index ff684b48d0..b25653e95a 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -2,6 +2,7 @@ package submatview import ( "context" + "errors" "fmt" "sync" "time" @@ -98,10 +99,11 @@ func (s *Store) Get(ctx context.Context, req Request) (Result, error) { defer cancel() result, err := e.materializer.getFromView(ctx, info.MinIndex) - - // TODO: does context.DeadlineExceeded need to be translated into a nil error - // to match the old interface? - + // context.DeadlineExceeded is translated to nil to match the behaviour of + // agent/cache.Cache.Get. + if err == nil || errors.Is(err, context.DeadlineExceeded) { + return result, nil + } return result, err }