diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go new file mode 100644 index 0000000000..4019ee8f6b --- /dev/null +++ b/agent/cache-types/streaming_events_test.go @@ -0,0 +1,174 @@ +package cachetype + +import ( + fmt "fmt" + + "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" +) + +func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { + return &pbsubscribe.Event{ + Topic: topic, + Index: index, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + } +} + +func newEndOfEmptySnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { + return &pbsubscribe.Event{ + Topic: topic, + Index: index, + Payload: &pbsubscribe.Event_EndOfEmptySnapshot{EndOfEmptySnapshot: true}, + } +} + +// newEventServiceHealthRegister returns a realistically populated service +// health registration event for tests. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return pbsubscribe.Event{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: svc, + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + // Empty sadness + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Checks: []*pbservice.HealthCheck{ + { + Node: node, + CheckID: "serf-health", + Name: "serf-health", + Status: "passing", + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + { + Node: node, + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: "passing", + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + }, + } +} + +// TestEventServiceHealthDeregister returns a realistically populated service +// health deregistration event for tests. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + + return pbsubscribe.Event{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: svc, + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: node, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + // Empty sadness + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: pbcommon.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + }, + } +} + +func newEventBatchWithEvents(first pbsubscribe.Event, evs ...pbsubscribe.Event) pbsubscribe.Event { + events := make([]*pbsubscribe.Event, len(evs)+1) + events[0] = &first + for i := range evs { + events[i+1] = &evs[i] + } + return pbsubscribe.Event{ + Topic: first.Topic, + Index: first.Index, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{Events: events}, + }, + } +} diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index 2be1b40e92..893069820e 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -3,11 +3,13 @@ package cachetype import ( "fmt" - "github.com/hashicorp/consul/agent/agentpb" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbsubscribe" ) const ( @@ -40,18 +42,20 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque "Internal cache failure: request wrong type: %T", req) } - r := agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, - Key: reqReal.ServiceName, - Token: reqReal.Token, - Index: reqReal.MinQueryIndex, - Filter: reqReal.Filter, - Datacenter: reqReal.Datacenter, + r := Request{ + SubscribeRequest: pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: reqReal.ServiceName, + Token: reqReal.Token, + Index: reqReal.MinQueryIndex, + Datacenter: reqReal.Datacenter, + }, + Filter: reqReal.Filter, } // Connect requests need a different topic if reqReal.Connect { - r.Topic = agentpb.Topic_ServiceHealthConnect + r.Topic = pbsubscribe.Topic_ServiceHealthConnect } view := MaterializedViewFromFetch(c, opts, r) @@ -105,7 +109,7 @@ func (s *healthViewState) InitFilter(expression string) error { } // Update implements MaterializedViewState -func (s *healthViewState) Update(events []*agentpb.Event) error { +func (s *healthViewState) Update(events []*pbsubscribe.Event) error { for _, event := range events { serviceHealth := event.GetServiceHealth() if serviceHealth == nil { @@ -116,13 +120,10 @@ func (s *healthViewState) Update(events []*agentpb.Event) error { id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID) switch serviceHealth.Op { - case agentpb.CatalogOp_Register: - checkServiceNode, err := serviceHealth.CheckServiceNode.ToStructs() - if err != nil { - return err - } + case pbsubscribe.CatalogOp_Register: + checkServiceNode := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode) s.state[id] = *checkServiceNode - case agentpb.CatalogOp_Deregister: + case pbsubscribe.CatalogOp_Deregister: delete(s.state, id) } } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 06f346a355..830d373d8b 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -6,12 +6,14 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-hclog" - - "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/proto/pbsubscribe" ) func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { @@ -23,8 +25,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // Initially there are no services registered. Server should send an // EndOfSnapshot message immediately with index of 1. - eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 1) - client.QueueEvents(&eosEv) + client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -42,7 +43,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { }, } - require.True(t, t.Run("empty snapshot returned", func(t *testing.T) { + runStep(t, "empty snapshot returned", func(t *testing.T) { // Fetch should return an empty // result of the right type with a non-zero index, and in the background begin // streaming updates. @@ -54,9 +55,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("blocks for timeout", func(t *testing.T) { + runStep(t, "blocks for timeout", func(t *testing.T) { // Subsequent fetch should block for the timeout start := time.Now() opts.Timeout = 200 * time.Millisecond @@ -71,9 +72,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("blocks until update", func(t *testing.T) { + runStep(t, "blocks until update", func(t *testing.T) { // Make another blocking query with a longer timeout and trigger an update // event part way through. start := time.Now() @@ -81,7 +82,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { time.Sleep(200 * time.Millisecond) // Then a service registers - healthEv := agentpb.TestEventServiceHealthRegister(t, 4, 1, "web") + healthEv := newEventServiceHealthRegister(4, 1, "web") client.QueueEvents(&healthEv) }() @@ -100,17 +101,16 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("reconnects and resumes after transient stream error", func(t *testing.T) { + runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) { // Use resetErr just because it's "temporary" this is a stand in for any // network error that uses that same interface though. client.QueueErr(resetErr("broken pipe")) // After the error the view should re-subscribe with same index so will get // a "resume stream". - resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) - client.QueueEvents(&resumeEv) + client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) // Next fetch will continue to block until timeout and receive the same // result. @@ -129,7 +129,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.LastResult = &result // But an update should still be noticed due to reconnection - healthEv := agentpb.TestEventServiceHealthRegister(t, 10, 2, "web") + healthEv := newEventServiceHealthRegister(10, 2, "web") client.QueueEvents(&healthEv) start = time.Now() @@ -146,9 +146,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("returns non-temporary error to watchers", func(t *testing.T) { + runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { // Wait and send the error while fetcher is waiting go func() { time.Sleep(200 * time.Millisecond) @@ -156,8 +156,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // After the error the view should re-subscribe with same index so will get // a "resume stream". - resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex) - client.QueueEvents(&resumeEv) + client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) }() // Next fetch should return the error @@ -183,7 +182,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.LastResult = &result // But an update should still be noticed due to reconnection - healthEv := agentpb.TestEventServiceHealthRegister(t, opts.MinIndex+5, 3, "web") + healthEv := newEventServiceHealthRegister(opts.MinIndex+5, 3, "web") client.QueueEvents(&healthEv) opts.Timeout = time.Second @@ -199,7 +198,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) } // requireResultsSame compares two IndexedCheckServiceNodes without requiring @@ -229,17 +228,15 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { } // Create an initial snapshot of 3 instances on different nodes - makeReg := func(index uint64, nodeNum int) *agentpb.Event { - e := agentpb.TestEventServiceHealthRegister(t, index, nodeNum, "web") + makeReg := func(index uint64, nodeNum int) *pbsubscribe.Event { + e := newEventServiceHealthRegister(index, nodeNum, "web") return &e } - eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) client.QueueEvents( makeReg(5, 1), makeReg(5, 2), makeReg(5, 3), - &eosEv, - ) + newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -260,7 +257,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { return nodes } - require.True(t, t.Run("full snapshot returned", func(t *testing.T) { + runStep(t, "full snapshot returned", func(t *testing.T) { result, err := typ.Fetch(opts, req) require.NoError(t, err) @@ -270,9 +267,9 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("blocks until deregistration", func(t *testing.T) { + runStep(t, "blocks until deregistration", func(t *testing.T) { // Make another blocking query with a longer timeout and trigger an update // event part way through. start := time.Now() @@ -280,7 +277,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { time.Sleep(200 * time.Millisecond) // Deregister instance on node1 - healthEv := agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web") + healthEv := newEventServiceHealthDeregister(20, 1, "web") client.QueueEvents(&healthEv) }() @@ -299,21 +296,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("server reload is respected", func(t *testing.T) { + runStep(t, "server reload is respected", func(t *testing.T) { // Simulates the server noticing the request's ACL token privs changing. To // detect this we'll queue up the new snapshot as a different set of nodes // to the first. - resetEv := agentpb.TestEventResetStream(t, agentpb.Topic_ServiceHealth, 45) - eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 50) + client.QueueErr(status.Error(codes.Aborted, "reset by server")) + client.QueueEvents( - &resetEv, makeReg(50, 3), // overlap existing node makeReg(50, 4), makeReg(50, 5), - &eosEv, - ) + newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) // Make another blocking query with THE SAME index. It should immediately // return the new snapshot. @@ -331,7 +326,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) } func TestStreamingHealthServices_EventBatches(t *testing.T) { @@ -342,17 +337,13 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { } // Create an initial snapshot of 3 instances but in a single event batch - batchEv := agentpb.TestEventBatchWithEvents(t, - agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), - agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), - agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), - ) - eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) - + batchEv := newEventBatchWithEvents( + newEventServiceHealthRegister(5, 1, "web"), + newEventServiceHealthRegister(5, 2, "web"), + newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( &batchEv, - &eosEv, - ) + newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -373,7 +364,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { return nodes } - require.True(t, t.Run("full snapshot returned", func(t *testing.T) { + runStep(t, "full snapshot returned", func(t *testing.T) { result, err := typ.Fetch(opts, req) require.NoError(t, err) @@ -383,16 +374,16 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("batched updates work too", func(t *testing.T) { + runStep(t, "batched updates work too", func(t *testing.T) { // Simulate multiple registrations happening in one Txn (so all have same // index) - batchEv := agentpb.TestEventBatchWithEvents(t, + batchEv := newEventBatchWithEvents( // Deregister an existing node - agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), + newEventServiceHealthDeregister(20, 1, "web"), // Register another - agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), + newEventServiceHealthRegister(20, 4, "web"), ) client.QueueEvents(&batchEv) opts.Timeout = time.Second @@ -405,7 +396,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) } func TestStreamingHealthServices_Filtering(t *testing.T) { @@ -416,16 +407,13 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { } // Create an initial snapshot of 3 instances but in a single event batch - batchEv := agentpb.TestEventBatchWithEvents(t, - agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"), - agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"), - agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"), - ) - eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5) + batchEv := newEventBatchWithEvents( + newEventServiceHealthRegister(5, 1, "web"), + newEventServiceHealthRegister(5, 2, "web"), + newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( &batchEv, - &eosEv, - ) + newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -449,7 +437,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { return nodes } - require.True(t, t.Run("filtered snapshot returned", func(t *testing.T) { + runStep(t, "filtered snapshot returned", func(t *testing.T) { result, err := typ.Fetch(opts, req) require.NoError(t, err) @@ -459,16 +447,16 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) - require.True(t, t.Run("filtered updates work too", func(t *testing.T) { + runStep(t, "filtered updates work too", func(t *testing.T) { // Simulate multiple registrations happening in one Txn (so all have same // index) - batchEv := agentpb.TestEventBatchWithEvents(t, + batchEv := newEventBatchWithEvents( // Deregister an existing node - agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"), + newEventServiceHealthDeregister(20, 1, "web"), // Register another - agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"), + newEventServiceHealthRegister(20, 4, "web"), ) client.QueueEvents(&batchEv) opts.Timeout = time.Second @@ -481,5 +469,11 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result - })) + }) +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + if !t.Run(name, fn) { + t.FailNow() + } } diff --git a/agent/cache-types/streaming_materialized_view.go b/agent/cache-types/streaming_materialized_view.go index d70371e534..120278f5cf 100644 --- a/agent/cache-types/streaming_materialized_view.go +++ b/agent/cache-types/streaming_materialized_view.go @@ -6,11 +6,14 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/agentpb" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-hclog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/proto/pbsubscribe" ) const ( @@ -22,7 +25,7 @@ const ( // StreamingClient is the interface we need from the gRPC client stub. Separate // interface simplifies testing. type StreamingClient interface { - Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) + Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) } // MaterializedViewState is the interface used to manage they type-specific @@ -42,7 +45,7 @@ type MaterializedViewState interface { // include _all_ events in the initial snapshot which may be an empty set. // Subsequent calls will contain one or more update events in the order they // are received. - Update(events []*agentpb.Event) error + Update(events []*pbsubscribe.Event) error // Result returns the type-specific cache result based on the state. When no // events have been delivered yet the result should be an empty value type @@ -83,6 +86,14 @@ func (e resetErr) Error() string { return string(e) } +type Request struct { + pbsubscribe.SubscribeRequest + // Filter is a bexpr filter expression that is used to filter events on the + // client side. + // TODO: is this used? + Filter string +} + // MaterializedView is a partial view of the state on servers, maintained via // streaming subscriptions. It is specialized for different cache types by // providing a MaterializedViewState that encapsulates the logic to update the @@ -97,7 +108,7 @@ type MaterializedView struct { typ StreamingCacheType client StreamingClient logger hclog.Logger - req agentpb.SubscribeRequest + req Request ctx context.Context cancelFunc func() @@ -120,9 +131,11 @@ type MaterializedView struct { // the cache evicts the result. If the view is not returned in a result state // though Close must be called some other way to avoid leaking the goroutine and // memory. -func MaterializedViewFromFetch(t StreamingCacheType, opts cache.FetchOptions, - subReq agentpb.SubscribeRequest) *MaterializedView { - +func MaterializedViewFromFetch( + t StreamingCacheType, + opts cache.FetchOptions, + subReq Request, +) *MaterializedView { if opts.LastResult == nil || opts.LastResult.State == nil { ctx, cancel := context.WithCancel(context.Background()) v := &MaterializedView{ @@ -225,26 +238,21 @@ func (v *MaterializedView) runSubscription() error { v.l.Unlock() - s, err := v.client.Subscribe(ctx, &req) + s, err := v.client.Subscribe(ctx, &req.SubscribeRequest) if err != nil { return err } - snapshotEvents := make([]*agentpb.Event, 0) + snapshotEvents := make([]*pbsubscribe.Event, 0) for { event, err := s.Recv() - if err != nil { - return err - } - - if event.GetResetStream() { - // Server has requested we reset the view and start with a fresh snapshot - // - perhaps because our ACL policy changed. We reset the view state and - // then return an error to allow the `run` method to retry after a backoff - // if necessary. + switch { + case isGrpcStatus(err, codes.Aborted): v.reset() return resetErr("stream reset requested") + case err != nil: + return err } if event.GetEndOfSnapshot() { @@ -276,7 +284,7 @@ func (v *MaterializedView) runSubscription() error { continue } - if event.GetResumeStream() { + if event.GetEndOfEmptySnapshot() { // We've opened a new subscribe with a non-zero index to resume a // connection and the server confirms it's not sending a new snapshot. if !snapshotDone { @@ -291,7 +299,7 @@ func (v *MaterializedView) runSubscription() error { } // We have an event for the topic - events := []*agentpb.Event{event} + events := []*pbsubscribe.Event{event} // If the event is a batch, unwrap and deliver the raw events if batch := event.GetEventBatch(); batch != nil { @@ -322,6 +330,11 @@ func (v *MaterializedView) runSubscription() error { } } +func isGrpcStatus(err error, code codes.Code) bool { + s, ok := status.FromError(err) + return ok && s.Code() == code +} + // reset clears the state ready to start a new stream from scratch. func (v *MaterializedView) reset() { v.l.Lock() diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go index 59ca481c72..929420e94d 100644 --- a/agent/cache-types/streaming_test.go +++ b/agent/cache-types/streaming_test.go @@ -3,9 +3,10 @@ package cachetype import ( "context" - "github.com/hashicorp/consul/agent/agentpb" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + + "github.com/hashicorp/consul/proto/pbsubscribe" ) // TestStreamingClient is a mock StreamingClient for testing that allows @@ -17,7 +18,7 @@ type TestStreamingClient struct { type eventOrErr struct { Err error - Event *agentpb.Event + Event *pbsubscribe.Event } func NewTestStreamingClient() *TestStreamingClient { @@ -26,13 +27,16 @@ func NewTestStreamingClient() *TestStreamingClient { } } -func (t *TestStreamingClient) Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) { +func (t *TestStreamingClient) Subscribe( + ctx context.Context, + _ *pbsubscribe.SubscribeRequest, + _ ...grpc.CallOption, +) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { t.ctx = ctx - return t, nil } -func (t *TestStreamingClient) QueueEvents(events ...*agentpb.Event) { +func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { for _, e := range events { t.events <- eventOrErr{Event: e} } @@ -42,7 +46,7 @@ func (t *TestStreamingClient) QueueErr(err error) { t.events <- eventOrErr{Err: err} } -func (t *TestStreamingClient) Recv() (*agentpb.Event, error) { +func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { select { case eoe := <-t.events: if eoe.Err != nil { diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index cf665de0ed..782e0b9f2b 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -856,11 +856,12 @@ func TestCacheGet_expireClose(t *testing.T) { require := require.New(t) - typ := TestType(t) + typ := &MockType{} defer typ.AssertExpectations(t) c := New(Options{}) typ.On("RegisterOptions").Return(RegisterOptions{ - LastGetTTL: 100 * time.Millisecond, + SupportsBlocking: true, + LastGetTTL: 100 * time.Millisecond, }) // Register the type with a timeout