From 132b76acef095b4787e026f73bf1b7f64b22752a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 18 Sep 2020 18:25:56 -0400 Subject: [PATCH] agent/cache: Add cache-type and materialized view for streaming health Extracted from d97412ce4c399a35b41bbdae2716f0e32dce80bf Co-authored-by: Paul Banks --- .../cache-types/streaming_health_services.go | 149 ++++++ .../streaming_health_services_test.go | 485 ++++++++++++++++++ .../streaming_materialized_view.go | 447 ++++++++++++++++ agent/cache-types/streaming_test.go | 67 +++ agent/cache/cache.go | 7 + agent/cache/cache_test.go | 44 ++ 6 files changed, 1199 insertions(+) create mode 100644 agent/cache-types/streaming_health_services.go create mode 100644 agent/cache-types/streaming_health_services_test.go create mode 100644 agent/cache-types/streaming_materialized_view.go create mode 100644 agent/cache-types/streaming_test.go diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go new file mode 100644 index 0000000000..2be1b40e92 --- /dev/null +++ b/agent/cache-types/streaming_health_services.go @@ -0,0 +1,149 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" +) + +const ( + // Recommended name for registration. + StreamingHealthServicesName = "streaming-health-services" +) + +// StreamingHealthServices supports fetching discovering service instances via the +// catalog using the streaming gRPC endpoint. +type StreamingHealthServices struct { + client StreamingClient + logger hclog.Logger +} + +// NewStreamingHealthServices creates a cache-type for watching for service +// health results via streaming updates. +func NewStreamingHealthServices(client StreamingClient, logger hclog.Logger) *StreamingHealthServices { + return &StreamingHealthServices{ + client: client, + logger: logger, + } +} + +// Fetch implements cache.Type +func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + // The request should be a ServiceSpecificRequest. + reqReal, ok := req.(*structs.ServiceSpecificRequest) + if !ok { + return cache.FetchResult{}, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + r := agentpb.SubscribeRequest{ + Topic: agentpb.Topic_ServiceHealth, + Key: reqReal.ServiceName, + Token: reqReal.Token, + Index: reqReal.MinQueryIndex, + Filter: reqReal.Filter, + Datacenter: reqReal.Datacenter, + } + + // Connect requests need a different topic + if reqReal.Connect { + r.Topic = agentpb.Topic_ServiceHealthConnect + } + + view := MaterializedViewFromFetch(c, opts, r) + return view.Fetch(opts) +} + +// SupportsBlocking implements cache.Type +func (c *StreamingHealthServices) SupportsBlocking() bool { + return true +} + +// NewMaterializedView implements StreamingCacheType +func (c *StreamingHealthServices) NewMaterializedViewState() MaterializedViewState { + return &healthViewState{ + state: make(map[string]structs.CheckServiceNode), + } +} + +// StreamingClient implements StreamingCacheType +func (c *StreamingHealthServices) StreamingClient() StreamingClient { + return c.client +} + +// Logger implements StreamingCacheType +func (c *StreamingHealthServices) Logger() hclog.Logger { + return c.logger +} + +// healthViewState implements MaterializedViewState for storing the view state +// of a service health result. We store it as a map to make updates and +// deletions a little easier but we could just store a result type +// (IndexedCheckServiceNodes) and update it in place for each event - that +// involves re-sorting each time etc. though. +type healthViewState struct { + state map[string]structs.CheckServiceNode + filter *bexpr.Filter +} + +// InitFilter implements MaterializedViewState +func (s *healthViewState) InitFilter(expression string) error { + // We apply filtering to the raw CheckServiceNodes before we are done mutating + // state in Update to save from storing stuff in memory we'll only filter + // later. Because the state is just a map of those types, we can simply run + // that map through filter and it will remove any entries that don't match. + filter, err := bexpr.CreateFilter(expression, nil, s.state) + if err != nil { + return err + } + s.filter = filter + return nil +} + +// Update implements MaterializedViewState +func (s *healthViewState) Update(events []*agentpb.Event) error { + for _, event := range events { + serviceHealth := event.GetServiceHealth() + if serviceHealth == nil { + return fmt.Errorf("unexpected event type for service health view: %T", + event.GetPayload()) + } + node := serviceHealth.CheckServiceNode + id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID) + + switch serviceHealth.Op { + case agentpb.CatalogOp_Register: + checkServiceNode, err := serviceHealth.CheckServiceNode.ToStructs() + if err != nil { + return err + } + s.state[id] = *checkServiceNode + case agentpb.CatalogOp_Deregister: + delete(s.state, id) + } + } + if s.filter != nil { + filtered, err := s.filter.Execute(s.state) + if err != nil { + return err + } + s.state = filtered.(map[string]structs.CheckServiceNode) + } + return nil +} + +// Result implements MaterializedViewState +func (s *healthViewState) Result(index uint64) (interface{}, error) { + var result structs.IndexedCheckServiceNodes + // Avoid a nil slice if there are no results in the view + result.Nodes = structs.CheckServiceNodes{} + for _, node := range s.state { + result.Nodes = append(result.Nodes, node) + } + result.Index = index + return &result, nil +} diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go new file mode 100644 index 0000000000..06f346a355 --- /dev/null +++ b/agent/cache-types/streaming_health_services_test.go @@ -0,0 +1,485 @@ +package cachetype + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + + "github.com/stretchr/testify/require" +) + +func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { + client := NewTestStreamingClient() + typ := StreamingHealthServices{ + client: client, + logger: hclog.Default(), + } + + // Initially there are no services registered. Server should send an + // EndOfSnapshot message immediately with index of 1. + eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 1) + client.QueueEvents(&eosEv) + + // This contains the view state so important we share it between calls. + opts := cache.FetchOptions{ + MinIndex: 0, + Timeout: 1 * time.Second, + } + req := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + empty := &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{}, + QueryMeta: structs.QueryMeta{ + Index: 1, + }, + } + + require.True(t, t.Run("empty snapshot returned", func(t *testing.T) { + // Fetch should return an empty + // result of the right type with a non-zero index, and in the background begin + // streaming updates. + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(1), result.Index) + require.Equal(t, empty, result.Value) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("blocks for timeout", func(t *testing.T) { + // Subsequent fetch should block for the timeout + start := time.Now() + opts.Timeout = 200 * time.Millisecond + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") + require.Equal(t, empty, result.Value, "result value should not have changed") + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("blocks until update", func(t *testing.T) { + // Make another blocking query with a longer timeout and trigger an update + // event part way through. + start := time.Now() + go func() { + time.Sleep(200 * time.Millisecond) + + // Then a service registers + healthEv := agentpb.TestEventServiceHealthRegister(t, 4, 1, "web") + client.QueueEvents(&healthEv) + }() + + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until the event was delivered") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(4), result.Index, "result index should not have changed") + require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1, + "result value should contain the new registration") + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("reconnects and resumes after transient stream error", func(t *testing.T) { + // Use resetErr just because it's "temporary" this is a stand in for any + // network error that uses that same interface though. + client.QueueErr(resetErr("broken pipe")) + + // After the error the view should re-subscribe with same index so will get + // a "resume stream". + resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) + client.QueueEvents(&resumeEv) + + // Next fetch will continue to block until timeout and receive the same + // result. + start := time.Now() + opts.Timeout = 200 * time.Millisecond + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") + require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed") + + opts.MinIndex = result.Index + opts.LastResult = &result + + // But an update should still be noticed due to reconnection + healthEv := agentpb.TestEventServiceHealthRegister(t, 10, 2, "web") + client.QueueEvents(&healthEv) + + start = time.Now() + opts.Timeout = time.Second + result, err = typ.Fetch(opts, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(10), result.Index, "result index should not have changed") + require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2, + "result value should contain the new registration") + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("returns non-temporary error to watchers", func(t *testing.T) { + // Wait and send the error while fetcher is waiting + go func() { + time.Sleep(200 * time.Millisecond) + client.QueueErr(errors.New("invalid request")) + + // After the error the view should re-subscribe with same index so will get + // a "resume stream". + resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) + client.QueueEvents(&resumeEv) + }() + + // Next fetch should return the error + start := time.Now() + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.Error(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until error was sent") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") + // We don't require instances to be returned in same order so we use + // elementsMatch which is recursive. + requireResultsSame(t, + opts.LastResult.Value.(*structs.IndexedCheckServiceNodes), + result.Value.(*structs.IndexedCheckServiceNodes), + ) + + opts.MinIndex = result.Index + opts.LastResult = &result + + // But an update should still be noticed due to reconnection + healthEv := agentpb.TestEventServiceHealthRegister(t, opts.MinIndex+5, 3, "web") + client.QueueEvents(&healthEv) + + opts.Timeout = time.Second + result, err = typ.Fetch(opts, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed") + require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, + "result value should contain the new registration") + + opts.MinIndex = result.Index + opts.LastResult = &result + })) +} + +// requireResultsSame compares two IndexedCheckServiceNodes without requiring +// the same order of results (which vary due to map usage internally). +func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) { + require.Equal(t, want.Index, got.Index) + + svcIDs := func(csns structs.CheckServiceNodes) []string { + res := make([]string, 0, len(csns)) + for _, csn := range csns { + res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID)) + } + return res + } + + gotIDs := svcIDs(got.Nodes) + wantIDs := svcIDs(want.Nodes) + + require.ElementsMatch(t, wantIDs, gotIDs) +} + +func TestStreamingHealthServices_FullSnapshot(t *testing.T) { + client := NewTestStreamingClient() + typ := StreamingHealthServices{ + client: client, + logger: hclog.Default(), + } + + // Create an initial snapshot of 3 instances on different nodes + makeReg := func(index uint64, nodeNum int) *agentpb.Event { + e := agentpb.TestEventServiceHealthRegister(t, index, nodeNum, "web") + return &e + } + eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) + client.QueueEvents( + makeReg(5, 1), + makeReg(5, 2), + makeReg(5, 3), + &eosEv, + ) + + // This contains the view state so important we share it between calls. + opts := cache.FetchOptions{ + MinIndex: 0, + Timeout: 1 * time.Second, + } + req := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + + gatherNodes := func(res interface{}) []string { + nodes := make([]string, 0, 3) + r := res.(*structs.IndexedCheckServiceNodes) + for _, csn := range r.Nodes { + nodes = append(nodes, csn.Node.Node) + } + return nodes + } + + require.True(t, t.Run("full snapshot returned", func(t *testing.T) { + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(5), result.Index) + require.ElementsMatch(t, []string{"node1", "node2", "node3"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("blocks until deregistration", func(t *testing.T) { + // Make another blocking query with a longer timeout and trigger an update + // event part way through. + start := time.Now() + go func() { + time.Sleep(200 * time.Millisecond) + + // Deregister instance on node1 + healthEv := agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web") + client.QueueEvents(&healthEv) + }() + + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until the event was delivered") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(20), result.Index) + require.ElementsMatch(t, []string{"node2", "node3"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("server reload is respected", func(t *testing.T) { + // Simulates the server noticing the request's ACL token privs changing. To + // detect this we'll queue up the new snapshot as a different set of nodes + // to the first. + resetEv := agentpb.TestEventResetStream(t, agentpb.Topic_ServiceHealth, 45) + eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 50) + client.QueueEvents( + &resetEv, + makeReg(50, 3), // overlap existing node + makeReg(50, 4), + makeReg(50, 5), + &eosEv, + ) + + // Make another blocking query with THE SAME index. It should immediately + // return the new snapshot. + start := time.Now() + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(50), result.Index) + require.ElementsMatch(t, []string{"node3", "node4", "node5"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) +} + +func TestStreamingHealthServices_EventBatches(t *testing.T) { + client := NewTestStreamingClient() + typ := StreamingHealthServices{ + client: client, + logger: hclog.Default(), + } + + // Create an initial snapshot of 3 instances but in a single event batch + batchEv := agentpb.TestEventBatchWithEvents(t, + agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), + agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), + agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), + ) + eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) + + client.QueueEvents( + &batchEv, + &eosEv, + ) + + // This contains the view state so important we share it between calls. + opts := cache.FetchOptions{ + MinIndex: 0, + Timeout: 1 * time.Second, + } + req := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + + gatherNodes := func(res interface{}) []string { + nodes := make([]string, 0, 3) + r := res.(*structs.IndexedCheckServiceNodes) + for _, csn := range r.Nodes { + nodes = append(nodes, csn.Node.Node) + } + return nodes + } + + require.True(t, t.Run("full snapshot returned", func(t *testing.T) { + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(5), result.Index) + require.ElementsMatch(t, []string{"node1", "node2", "node3"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("batched updates work too", func(t *testing.T) { + // Simulate multiple registrations happening in one Txn (so all have same + // index) + batchEv := agentpb.TestEventBatchWithEvents(t, + // Deregister an existing node + agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), + // Register another + agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), + ) + client.QueueEvents(&batchEv) + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(20), result.Index) + require.ElementsMatch(t, []string{"node2", "node3", "node4"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) +} + +func TestStreamingHealthServices_Filtering(t *testing.T) { + client := NewTestStreamingClient() + typ := StreamingHealthServices{ + client: client, + logger: hclog.Default(), + } + + // Create an initial snapshot of 3 instances but in a single event batch + batchEv := agentpb.TestEventBatchWithEvents(t, + agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), + agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), + agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), + ) + eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) + client.QueueEvents( + &batchEv, + &eosEv, + ) + + // This contains the view state so important we share it between calls. + opts := cache.FetchOptions{ + MinIndex: 0, + Timeout: 1 * time.Second, + } + req := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node2"`, + }, + } + + gatherNodes := func(res interface{}) []string { + nodes := make([]string, 0, 3) + r := res.(*structs.IndexedCheckServiceNodes) + for _, csn := range r.Nodes { + nodes = append(nodes, csn.Node.Node) + } + return nodes + } + + require.True(t, t.Run("filtered snapshot returned", func(t *testing.T) { + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(5), result.Index) + require.ElementsMatch(t, []string{"node2"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) + + require.True(t, t.Run("filtered updates work too", func(t *testing.T) { + // Simulate multiple registrations happening in one Txn (so all have same + // index) + batchEv := agentpb.TestEventBatchWithEvents(t, + // Deregister an existing node + agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), + // Register another + agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), + ) + client.QueueEvents(&batchEv) + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + + require.Equal(t, uint64(20), result.Index) + require.ElementsMatch(t, []string{"node2"}, + gatherNodes(result.Value)) + + opts.MinIndex = result.Index + opts.LastResult = &result + })) +} diff --git a/agent/cache-types/streaming_materialized_view.go b/agent/cache-types/streaming_materialized_view.go new file mode 100644 index 0000000000..d70371e534 --- /dev/null +++ b/agent/cache-types/streaming_materialized_view.go @@ -0,0 +1,447 @@ +package cachetype + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" +) + +const ( + // SubscribeBackoffMax controls the range of exponential backoff when errors + // are returned from subscriptions. + SubscribeBackoffMax = 60 * time.Second +) + +// StreamingClient is the interface we need from the gRPC client stub. Separate +// interface simplifies testing. +type StreamingClient interface { + Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) +} + +// MaterializedViewState is the interface used to manage they type-specific +// materialized view logic. +type MaterializedViewState interface { + // InitFilter is called once when the view is constructed if the subscription + // has a non-empty Filter argument. The implementor is expected to create a + // *bexpr.Filter and store it locally so it can be used to filter events + // and/or results. Ideally filtering should occur inside `Update` calls such + // that we don't store objects in the view state that are just filtered when + // the result is returned, however in some cases it might not be possible and + // the type may choose to store the whole view and only apply filtering in the + // Result method just before returning a result. + InitFilter(expression string) error + + // Update is called when one or more events are received. The first call will + // include _all_ events in the initial snapshot which may be an empty set. + // Subsequent calls will contain one or more update events in the order they + // are received. + Update(events []*agentpb.Event) error + + // Result returns the type-specific cache result based on the state. When no + // events have been delivered yet the result should be an empty value type + // suitable to return to clients in case there is an empty result on the + // servers. The index the materialized view represents is maintained + // separately and passed in in case the return type needs an Index field + // populating. This allows implementations to not worry about maintaining + // indexes seen during Update. + Result(index uint64) (interface{}, error) +} + +// StreamingCacheType is the interface a cache-type needs to implement to make +// use of streaming as the transport for updates from the server. +type StreamingCacheType interface { + NewMaterializedViewState() MaterializedViewState + StreamingClient() StreamingClient + Logger() hclog.Logger +} + +// temporary is a private interface as used by net and other std lib packages to +// show error types represent temporary/recoverable errors. +type temporary interface { + Temporary() bool +} + +// resetErr represents a server request to reset the subscription, it's typed so +// we can mark it as temporary and so attempt to retry first time without +// notifying clients. +type resetErr string + +// Temporary Implements the internal Temporary interface +func (e resetErr) Temporary() bool { + return true +} + +// Error implements error +func (e resetErr) Error() string { + return string(e) +} + +// MaterializedView is a partial view of the state on servers, maintained via +// streaming subscriptions. It is specialized for different cache types by +// providing a MaterializedViewState that encapsulates the logic to update the +// state and format it as the correct result type. +// +// The MaterializedView object becomes the cache.Result.State for a streaming +// cache type and manages the actual streaming RPC call to the servers behind +// the scenes until the cache result is discarded when TTL expires. +type MaterializedView struct { + // Properties above the lock are immutable after the view is constructed in + // MaterializedViewFromFetch and must not be modified. + typ StreamingCacheType + client StreamingClient + logger hclog.Logger + req agentpb.SubscribeRequest + ctx context.Context + cancelFunc func() + + // l protects the mutable state - all fields below it must only be accessed + // while holding l. + l sync.Mutex + index uint64 + state MaterializedViewState + snapshotDone bool + updateCh chan struct{} + retryWaiter *lib.RetryWaiter + err error + fatalErr error +} + +// MaterializedViewFromFetch retrieves an existing view from the cache result +// state if one exists, otherwise creates a new one. Note that the returned view +// MUST have Close called eventually to avoid leaking resources. Typically this +// is done automatically if the view is returned in a cache.Result.State when +// the cache evicts the result. If the view is not returned in a result state +// though Close must be called some other way to avoid leaking the goroutine and +// memory. +func MaterializedViewFromFetch(t StreamingCacheType, opts cache.FetchOptions, + subReq agentpb.SubscribeRequest) *MaterializedView { + + if opts.LastResult == nil || opts.LastResult.State == nil { + ctx, cancel := context.WithCancel(context.Background()) + v := &MaterializedView{ + typ: t, + client: t.StreamingClient(), + logger: t.Logger(), + req: subReq, + ctx: ctx, + cancelFunc: cancel, + // Allow first retry without wait, this is important and we rely on it in + // tests. + retryWaiter: lib.NewRetryWaiter(1, 0, SubscribeBackoffMax, + lib.NewJitterRandomStagger(100)), + } + // Run init now otherwise there is a race between run() and a call to Fetch + // which expects a view state to exist. + v.reset() + go v.run() + return v + } + return opts.LastResult.State.(*MaterializedView) +} + +// Close implements io.Close and discards view state and stops background view +// maintenance. +func (v *MaterializedView) Close() error { + v.l.Lock() + defer v.l.Unlock() + if v.cancelFunc != nil { + v.cancelFunc() + } + return nil +} + +func (v *MaterializedView) run() { + if v.ctx.Err() != nil { + return + } + + // Loop in case stream resets and we need to start over + for { + // Run a subscribe call until it fails + err := v.runSubscription() + if err != nil { + // Check if the view closed + if v.ctx.Err() != nil { + // Err doesn't matter and is likely just context cancelled + return + } + + v.l.Lock() + // If this is a temporary error and it's the first consecutive failure, + // retry to see if we can get a result without erroring back to clients. + // If it's non-temporary or a repeated failure return to clients while we + // retry to get back in a good state. + if _, ok := err.(temporary); !ok || v.retryWaiter.Failures() > 0 { + // Report error to blocked fetchers + v.err = err + v.notifyUpdateLocked() + } + waitCh := v.retryWaiter.Failed() + failures := v.retryWaiter.Failures() + v.l.Unlock() + + // Exponential backoff to avoid hammering servers if they are closing + // conns because of overload or resetting based on errors. + v.logger.Error("subscribe call failed", "err", err, "topic", v.req.Topic, + "key", v.req.Key, "failure_count", failures) + + select { + case <-v.ctx.Done(): + return + case <-waitCh: + } + } + // Loop and keep trying to resume subscription after error + } + +} + +// runSubscription opens a new subscribe streaming call to the servers and runs +// for it's lifetime or until the view is closed. +func (v *MaterializedView) runSubscription() error { + ctx, cancel := context.WithCancel(v.ctx) + defer cancel() + + // Copy the request template + req := v.req + + v.l.Lock() + + // Update request index to be the current view index in case we are resuming a + // broken stream. + req.Index = v.index + + // Make local copy so we don't have to read with a lock for every event. We + // are the only goroutine that can update so we know it won't change without + // us knowing but we do need lock to protect external readers when we update. + snapshotDone := v.snapshotDone + + v.l.Unlock() + + s, err := v.client.Subscribe(ctx, &req) + if err != nil { + return err + } + + snapshotEvents := make([]*agentpb.Event, 0) + + for { + event, err := s.Recv() + if err != nil { + return err + } + + if event.GetResetStream() { + // Server has requested we reset the view and start with a fresh snapshot + // - perhaps because our ACL policy changed. We reset the view state and + // then return an error to allow the `run` method to retry after a backoff + // if necessary. + v.reset() + return resetErr("stream reset requested") + } + + if event.GetEndOfSnapshot() { + // Hold lock while mutating view state so implementer doesn't need to + // worry about synchronization. + v.l.Lock() + + // Deliver snapshot events to the View state + if err := v.state.Update(snapshotEvents); err != nil { + v.l.Unlock() + // This error is kinda fatal to the view - we didn't apply some events + // the server sent us which means our view is now not in sync. The only + // thing we can do is start over and hope for a better outcome. + v.reset() + return err + } + // Done collecting these now + snapshotEvents = nil + v.snapshotDone = true + // update our local copy so we can read it without lock. + snapshotDone = true + v.index = event.Index + // We have a good result, reset the error flag + v.err = nil + v.retryWaiter.Reset() + // Notify watchers of the update to the view + v.notifyUpdateLocked() + v.l.Unlock() + continue + } + + if event.GetResumeStream() { + // We've opened a new subscribe with a non-zero index to resume a + // connection and the server confirms it's not sending a new snapshot. + if !snapshotDone { + // We've somehow got into a bad state here - the server thinks we have + // an up-to-date snapshot but we don't think we do. Reset and start + // over. + v.reset() + return errors.New("stream resume sent but no local snapshot") + } + // Just continue on as we were! + continue + } + + // We have an event for the topic + events := []*agentpb.Event{event} + + // If the event is a batch, unwrap and deliver the raw events + if batch := event.GetEventBatch(); batch != nil { + events = batch.Events + } + + if snapshotDone { + // We've already got a snapshot, this is an update, deliver it right away. + v.l.Lock() + if err := v.state.Update(events); err != nil { + v.l.Unlock() + // This error is kinda fatal to the view - we didn't apply some events + // the server sent us which means our view is now not in sync. The only + // thing we can do is start over and hope for a better outcome. + v.reset() + return err + } + // Notify watchers of the update to the view + v.index = event.Index + // We have a good result, reset the error flag + v.err = nil + v.retryWaiter.Reset() + v.notifyUpdateLocked() + v.l.Unlock() + } else { + snapshotEvents = append(snapshotEvents, events...) + } + } +} + +// reset clears the state ready to start a new stream from scratch. +func (v *MaterializedView) reset() { + v.l.Lock() + defer v.l.Unlock() + + v.state = v.typ.NewMaterializedViewState() + if v.req.Filter != "" { + if err := v.state.InitFilter(v.req.Filter); err != nil { + // If this errors we are stuck - it's fatal for the whole request as it + // means there was a bug or an invalid filter string we couldn't parse. + // Stop the whole view by closing it and cancelling context, but also set + // the error internally so that Fetch calls can return a meaningful error + // and not just "context cancelled". + v.fatalErr = err + if v.cancelFunc != nil { + v.cancelFunc() + } + return + } + } + v.notifyUpdateLocked() + // Always start from zero when we have a new state so we load a snapshot from + // the servers. + v.index = 0 + v.snapshotDone = false + v.err = nil + v.retryWaiter.Reset() +} + +// notifyUpdateLocked closes the current update channel and recreates a new +// one. It must be called while holding the s.l lock. +func (v *MaterializedView) notifyUpdateLocked() { + if v.updateCh != nil { + close(v.updateCh) + } + v.updateCh = make(chan struct{}) +} + +// Fetch implements the logic a StreamingCacheType will need during it's Fetch +// call. Cache types that use streaming should just be able to proxy to this +// once they have a subscription object and return it's results directly. +func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) { + var result cache.FetchResult + + // Get current view Result and index + v.l.Lock() + index := v.index + val, err := v.state.Result(v.index) + updateCh := v.updateCh + v.l.Unlock() + + if err != nil { + return result, err + } + + result.Index = index + result.Value = val + result.State = v + + // If our index is > req.Index return right away. If index is zero then we + // haven't loaded a snapshot at all yet which means we should wait for one on + // the update chan. Note it's opts.MinIndex that the cache is using here the + // request min index might be different and from initial user request. + if index > 0 && index > opts.MinIndex { + return result, nil + } + + // Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout + // since that is the timeout the client requested from the cache Get while the + // options one is the internal "background refresh" timeout which is what the + // Fetch call should be using. + timeoutCh := time.After(opts.Timeout) + for { + select { + case <-updateCh: + // View updated, return the new result + v.l.Lock() + result.Index = v.index + // Grab the new updateCh in case we need to keep waiting for the next + // update. + updateCh = v.updateCh + fetchErr := v.err + if fetchErr == nil { + // Only generate a new result if there was no error to avoid pointless + // work potentially shuffling the same data around. + result.Value, err = v.state.Result(v.index) + } + v.l.Unlock() + + // If there was a non-transient error return it + if fetchErr != nil { + return result, fetchErr + } + if err != nil { + return result, err + } + + // Sanity check the update is actually later than the one the user + // requested. + if result.Index <= opts.MinIndex { + // The result is still older/same as the requested index, continue to + // wait for further updates. + continue + } + + // Return the updated result + return result, nil + + case <-timeoutCh: + // Just return whatever we got originally, might still be empty + return result, nil + + case <-v.ctx.Done(): + v.l.Lock() + err := v.fatalErr + v.l.Unlock() + if err != nil { + return result, err + } + return result, v.ctx.Err() + } + } +} diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go new file mode 100644 index 0000000000..59ca481c72 --- /dev/null +++ b/agent/cache-types/streaming_test.go @@ -0,0 +1,67 @@ +package cachetype + +import ( + "context" + + "github.com/hashicorp/consul/agent/agentpb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// TestStreamingClient is a mock StreamingClient for testing that allows +// for queueing up custom events to a subscriber. +type TestStreamingClient struct { + events chan eventOrErr + ctx context.Context +} + +type eventOrErr struct { + Err error + Event *agentpb.Event +} + +func NewTestStreamingClient() *TestStreamingClient { + return &TestStreamingClient{ + events: make(chan eventOrErr, 32), + } +} + +func (t *TestStreamingClient) Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) { + t.ctx = ctx + + return t, nil +} + +func (t *TestStreamingClient) QueueEvents(events ...*agentpb.Event) { + for _, e := range events { + t.events <- eventOrErr{Event: e} + } +} + +func (t *TestStreamingClient) QueueErr(err error) { + t.events <- eventOrErr{Err: err} +} + +func (t *TestStreamingClient) Recv() (*agentpb.Event, error) { + select { + case eoe := <-t.events: + if eoe.Err != nil { + return nil, eoe.Err + } + return eoe.Event, nil + case <-t.ctx.Done(): + return nil, t.ctx.Err() + } +} + +func (t *TestStreamingClient) Header() (metadata.MD, error) { return nil, nil } + +func (t *TestStreamingClient) Trailer() metadata.MD { return nil } + +func (t *TestStreamingClient) CloseSend() error { return nil } + +func (t *TestStreamingClient) Context() context.Context { return nil } + +func (t *TestStreamingClient) SendMsg(m interface{}) error { return nil } + +func (t *TestStreamingClient) RecvMsg(m interface{}) error { return nil } diff --git a/agent/cache/cache.go b/agent/cache/cache.go index b92feb5a50..661dd756b6 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,6 +18,7 @@ import ( "container/heap" "context" "fmt" + io "io" "strconv" "sync" "sync/atomic" @@ -773,6 +774,12 @@ func (c *Cache) runExpiryLoop() { case <-expiryCh: c.entriesLock.Lock() + // Perform cleanup operations on the entry's state, if applicable. + state := c.entries[entry.Key].State + if closer, ok := state.(io.Closer); ok { + closer.Close() + } + // Entry expired! Remove it. delete(c.entries, entry.Key) heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index c2442ea7c1..cf665de0ed 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -841,6 +841,50 @@ func TestCacheGet_expireResetGet(t *testing.T) { typ.AssertExpectations(t) } +type testCloser struct { + closed bool +} + +func (t *testCloser) Close() error { + t.closed = true + return nil +} + +// Test that entries with state that satisfies io.Closer get cleaned up +func TestCacheGet_expireClose(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := New(Options{}) + typ.On("RegisterOptions").Return(RegisterOptions{ + LastGetTTL: 100 * time.Millisecond, + }) + + // Register the type with a timeout + c.RegisterType("t", typ) + + // Configure the type + state := &testCloser{} + typ.Static(FetchResult{Value: 42, State: state}, nil).Times(1) + + ctx := context.Background() + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, meta, err := c.Get(ctx, "t", req) + require.NoError(err) + require.Equal(42, result) + require.False(meta.Hit) + require.False(state.closed) + + // Sleep for the expiry + time.Sleep(200 * time.Millisecond) + + // state.Close() should have been called + require.True(state.closed) +} + // Test a Get with a request that returns the same cache key across // two different "types" returns two separate results. func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {