submatview: Move Materializer to submatview package

This commit is contained in:
Daniel Nephin 2020-10-01 02:21:50 -04:00
parent ed45957ffb
commit edf30b2714
3 changed files with 24 additions and 12 deletions

View File

@ -8,6 +8,8 @@ import (
"github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
@ -24,13 +26,13 @@ const (
// StreamingHealthServices supports fetching discovering service instances via the // StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint. // catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct { type StreamingHealthServices struct {
client StreamingClient client submatview.StreamingClient
logger hclog.Logger logger hclog.Logger
} }
// NewStreamingHealthServices creates a cache-type for watching for service // NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates. // health results via streaming updates.
func NewStreamingHealthServices(client StreamingClient, logger hclog.Logger) *StreamingHealthServices { func NewStreamingHealthServices(client submatview.StreamingClient, logger hclog.Logger) *StreamingHealthServices {
return &StreamingHealthServices{ return &StreamingHealthServices{
client: client, client: client,
logger: logger, logger: logger,
@ -46,7 +48,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
"Internal cache failure: request wrong type: %T", req) "Internal cache failure: request wrong type: %T", req)
} }
r := Request{ r := submatview.Request{
SubscribeRequest: pbsubscribe.SubscribeRequest{ SubscribeRequest: pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: reqReal.ServiceName, Key: reqReal.ServiceName,
@ -69,9 +71,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
return view.Fetch(opts) return view.Fetch(opts)
} }
func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r Request) (*Materializer, error) { func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r submatview.Request) (*submatview.Materializer, error) {
if opts.LastResult != nil && opts.LastResult.State != nil { if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*Materializer), nil return opts.LastResult.State.(*submatview.Materializer), nil
} }
state, err := newHealthViewState(r.Filter) state, err := newHealthViewState(r.Filter)
@ -79,7 +81,7 @@ func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
view := NewMaterializer(ViewDeps{ view := submatview.NewMaterializer(submatview.ViewDeps{
State: state, State: state,
Client: c.client, Client: c.client,
Logger: c.logger, Logger: c.logger,
@ -93,7 +95,7 @@ func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r
Stop: cancel, Stop: cancel,
Done: ctx.Done(), Done: ctx.Done(),
}) })
go view.run(ctx) go view.Run(ctx)
return view, nil return view, nil
} }
@ -102,7 +104,7 @@ func (c *StreamingHealthServices) SupportsBlocking() bool {
return true return true
} }
func newHealthViewState(filterExpr string) (View, error) { func newHealthViewState(filterExpr string) (submatview.View, error) {
s := &healthViewState{state: make(map[string]structs.CheckServiceNode)} s := &healthViewState{state: make(map[string]structs.CheckServiceNode)}
// We apply filtering to the raw CheckServiceNodes before we are done mutating // We apply filtering to the raw CheckServiceNodes before we are done mutating
@ -115,7 +117,7 @@ func newHealthViewState(filterExpr string) (View, error) {
} }
// StreamingClient implements StreamingCacheType // StreamingClient implements StreamingCacheType
func (c *StreamingHealthServices) StreamingClient() StreamingClient { func (c *StreamingHealthServices) StreamingClient() submatview.StreamingClient {
return c.client return c.client
} }

View File

@ -107,7 +107,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) { runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) {
// Use resetErr just because it's "temporary" this is a stand in for any // Use resetErr just because it's "temporary" this is a stand in for any
// network error that uses that same interface though. // network error that uses that same interface though.
client.QueueErr(resetErr("broken pipe")) client.QueueErr(tempError("broken pipe"))
// After the error the view should re-subscribe with same index so will get // After the error the view should re-subscribe with same index so will get
// a "resume stream". // a "resume stream".
@ -202,6 +202,16 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
}) })
} }
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
// requireResultsSame compares two IndexedCheckServiceNodes without requiring // requireResultsSame compares two IndexedCheckServiceNodes without requiring
// the same order of results (which vary due to map usage internally). // the same order of results (which vary due to map usage internally).
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) { func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {

View File

@ -1,4 +1,4 @@
package cachetype package submatview
import ( import (
"context" "context"
@ -131,7 +131,7 @@ func (v *Materializer) Close() error {
return nil return nil
} }
func (v *Materializer) run(ctx context.Context) { func (v *Materializer) Run(ctx context.Context) {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }