From edf30b27146bb5f3b6b0ccf76857ae3618079f73 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 1 Oct 2020 02:21:50 -0400 Subject: [PATCH] submatview: Move Materializer to submatview package --- .../cache-types/streaming_health_services.go | 20 ++++++++++--------- .../streaming_health_services_test.go | 12 ++++++++++- .../materializer.go} | 4 ++-- 3 files changed, 24 insertions(+), 12 deletions(-) rename agent/{cache-types/streaming_materialized_view.go => submatview/materializer.go} (99%) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index d98d919c16..42f58741d3 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -8,6 +8,8 @@ import ( "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/submatview" + "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/agent/cache" @@ -24,13 +26,13 @@ const ( // StreamingHealthServices supports fetching discovering service instances via the // catalog using the streaming gRPC endpoint. type StreamingHealthServices struct { - client StreamingClient + client submatview.StreamingClient logger hclog.Logger } // NewStreamingHealthServices creates a cache-type for watching for service // health results via streaming updates. -func NewStreamingHealthServices(client StreamingClient, logger hclog.Logger) *StreamingHealthServices { +func NewStreamingHealthServices(client submatview.StreamingClient, logger hclog.Logger) *StreamingHealthServices { return &StreamingHealthServices{ client: client, logger: logger, @@ -46,7 +48,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque "Internal cache failure: request wrong type: %T", req) } - r := Request{ + r := submatview.Request{ SubscribeRequest: pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: reqReal.ServiceName, @@ -69,9 +71,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque return view.Fetch(opts) } -func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r Request) (*Materializer, error) { +func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r submatview.Request) (*submatview.Materializer, error) { if opts.LastResult != nil && opts.LastResult.State != nil { - return opts.LastResult.State.(*Materializer), nil + return opts.LastResult.State.(*submatview.Materializer), nil } state, err := newHealthViewState(r.Filter) @@ -79,7 +81,7 @@ func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r return nil, err } ctx, cancel := context.WithCancel(context.TODO()) - view := NewMaterializer(ViewDeps{ + view := submatview.NewMaterializer(submatview.ViewDeps{ State: state, Client: c.client, Logger: c.logger, @@ -93,7 +95,7 @@ func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r Stop: cancel, Done: ctx.Done(), }) - go view.run(ctx) + go view.Run(ctx) return view, nil } @@ -102,7 +104,7 @@ func (c *StreamingHealthServices) SupportsBlocking() bool { return true } -func newHealthViewState(filterExpr string) (View, error) { +func newHealthViewState(filterExpr string) (submatview.View, error) { s := &healthViewState{state: make(map[string]structs.CheckServiceNode)} // We apply filtering to the raw CheckServiceNodes before we are done mutating @@ -115,7 +117,7 @@ func newHealthViewState(filterExpr string) (View, error) { } // StreamingClient implements StreamingCacheType -func (c *StreamingHealthServices) StreamingClient() StreamingClient { +func (c *StreamingHealthServices) StreamingClient() submatview.StreamingClient { return c.client } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index e2c94277b2..84e8342f3d 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -107,7 +107,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) { // Use resetErr just because it's "temporary" this is a stand in for any // network error that uses that same interface though. - client.QueueErr(resetErr("broken pipe")) + client.QueueErr(tempError("broken pipe")) // After the error the view should re-subscribe with same index so will get // a "resume stream". @@ -202,6 +202,16 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { }) } +type tempError string + +func (e tempError) Error() string { + return string(e) +} + +func (e tempError) Temporary() bool { + return true +} + // requireResultsSame compares two IndexedCheckServiceNodes without requiring // the same order of results (which vary due to map usage internally). func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) { diff --git a/agent/cache-types/streaming_materialized_view.go b/agent/submatview/materializer.go similarity index 99% rename from agent/cache-types/streaming_materialized_view.go rename to agent/submatview/materializer.go index b92c6ab235..c75126b27c 100644 --- a/agent/cache-types/streaming_materialized_view.go +++ b/agent/submatview/materializer.go @@ -1,4 +1,4 @@ -package cachetype +package submatview import ( "context" @@ -131,7 +131,7 @@ func (v *Materializer) Close() error { return nil } -func (v *Materializer) run(ctx context.Context) { +func (v *Materializer) Run(ctx context.Context) { if ctx.Err() != nil { return }