From 798953f57d31a96ac2b37d283c73d30b6a30432e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 28 Apr 2021 17:31:42 -0400 Subject: [PATCH] Merge pull request #10112 from hashicorp/dnephin/remove-streaming-from-cache streaming: replace agent/cache with submatview.Store --- .changelog/10112.txt | 3 + agent/agent.go | 25 +- agent/agent_test.go | 9 + agent/cache-types/streaming_test.go | 66 -- agent/consul/options.go | 11 +- agent/consul/subscribe_backend.go | 3 +- agent/health_endpoint.go | 7 - agent/proxycfg/manager_test.go | 5 +- agent/rpcclient/health/health.go | 91 +- agent/rpcclient/health/health_test.go | 235 ++++++ agent/rpcclient/health/streaming_test.go | 69 ++ .../health/view.go} | 125 +-- .../health/view_test.go} | 796 ++++++++++-------- agent/setup.go | 45 +- agent/submatview/materializer.go | 99 +-- agent/submatview/store.go | 240 ++++++ agent/submatview/store_test.go | 458 ++++++++++ .../streaming_test.go} | 141 ++-- sdk/testutil/retry/retry.go | 33 +- sdk/testutil/retry/retry_test.go | 60 +- website/content/api-docs/health.mdx | 7 +- 21 files changed, 1796 insertions(+), 732 deletions(-) create mode 100644 .changelog/10112.txt delete mode 100644 agent/cache-types/streaming_test.go create mode 100644 agent/rpcclient/health/health_test.go create mode 100644 agent/rpcclient/health/streaming_test.go rename agent/{cache-types/streaming_health_services.go => rpcclient/health/view.go} (56%) rename agent/{cache-types/streaming_health_services_test.go => rpcclient/health/view_test.go} (54%) create mode 100644 agent/submatview/store.go create mode 100644 agent/submatview/store_test.go rename agent/{cache-types/streaming_events_test.go => submatview/streaming_test.go} (56%) diff --git a/.changelog/10112.txt b/.changelog/10112.txt new file mode 100644 index 0000000000..d725d3fe1c --- /dev/null +++ b/.changelog/10112.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active. +``` diff --git a/agent/agent.go b/agent/agent.go index c6615af4be..21bddae4d6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -373,15 +373,21 @@ func New(bd BaseDeps) (*Agent, error) { cache: bd.Cache, } - cacheName := cachetype.HealthServicesName - if bd.RuntimeConfig.UseStreamingBackend { - cacheName = cachetype.StreamingHealthServicesName + // TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent + conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) + if err != nil { + return nil, err } + a.rpcClientHealth = &health.Client{ - Cache: bd.Cache, - NetRPC: &a, - CacheName: cacheName, - CacheNameIngress: cachetype.HealthServicesName, + Cache: bd.Cache, + NetRPC: &a, + CacheName: cachetype.HealthServicesName, + ViewStore: bd.ViewStore, + MaterializerDeps: health.MaterializerDeps{ + Conn: conn, + Logger: bd.Logger.Named("rpcclient.health"), + }, } a.serviceManager = NewServiceManager(&a) @@ -533,6 +539,8 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy) } + go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + // Start the proxy config manager. a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ Cache: a.cache, @@ -540,7 +548,6 @@ func (a *Agent) Start(ctx context.Context) error { Logger: a.logger.Named(logging.ProxyConfig), State: a.State, Source: &structs.QuerySource{ - Node: a.config.NodeName, Datacenter: a.config.Datacenter, Segment: a.config.SegmentName, }, @@ -1385,6 +1392,8 @@ func (a *Agent) ShutdownAgent() error { a.cache.Close() } + a.rpcClientHealth.Close() + var err error if a.delegate != nil { err = a.delegate.Shutdown() diff --git a/agent/agent_test.go b/agent/agent_test.go index 83d47d25d3..f01e739442 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" + "google.golang.org/grpc" "gopkg.in/square/go-jose.v2/jwt" "github.com/hashicorp/consul/agent/cache" @@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) { Logger: hclog.NewInterceptLogger(nil), Tokens: new(token.Store), TLSConfigurator: tlsConf, + GRPCConnPool: &fakeGRPCConnPool{}, }, RuntimeConfig: &config.RuntimeConfig{ HTTPAddrs: []net.Addr{ @@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) { } } +type fakeGRPCConnPool struct{} + +func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) { + return nil, nil +} + func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) { Logger: hclog.NewInterceptLogger(nil), Tokens: new(token.Store), TLSConfigurator: tlsConf, + GRPCConnPool: &fakeGRPCConnPool{}, }, RuntimeConfig: &config.RuntimeConfig{ HTTPAddrs: []net.Addr{ diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go deleted file mode 100644 index b12809c3c5..0000000000 --- a/agent/cache-types/streaming_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package cachetype - -import ( - "context" - "fmt" - - "google.golang.org/grpc" - - "github.com/hashicorp/consul/proto/pbsubscribe" -) - -// TestStreamingClient is a mock StreamingClient for testing that allows -// for queueing up custom events to a subscriber. -type TestStreamingClient struct { - pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context - expectedNamespace string -} - -type eventOrErr struct { - Err error - Event *pbsubscribe.Event -} - -func NewTestStreamingClient(ns string) *TestStreamingClient { - return &TestStreamingClient{ - events: make(chan eventOrErr, 32), - expectedNamespace: ns, - } -} - -func (t *TestStreamingClient) Subscribe( - ctx context.Context, - req *pbsubscribe.SubscribeRequest, - _ ...grpc.CallOption, -) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { - if req.Namespace != t.expectedNamespace { - return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", - req.Namespace, t.expectedNamespace) - } - t.ctx = ctx - return t, nil -} - -func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { - for _, e := range events { - t.events <- eventOrErr{Event: e} - } -} - -func (t *TestStreamingClient) QueueErr(err error) { - t.events <- eventOrErr{Err: err} -} - -func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { - select { - case eoe := <-t.events: - if eoe.Err != nil { - return nil, eoe.Err - } - return eoe.Event, nil - case <-t.ctx.Done(): - return nil, t.ctx.Err() - } -} diff --git a/agent/consul/options.go b/agent/consul/options.go index 12507cb855..9c75a7b339 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -1,12 +1,13 @@ package consul import ( - "github.com/hashicorp/consul/agent/grpc" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" ) type Deps struct { @@ -15,5 +16,9 @@ type Deps struct { Tokens *token.Store Router *router.Router ConnPool *pool.ConnPool - GRPCConnPool *grpc.ClientConnPool + GRPCConnPool GRPCClientConner +} + +type GRPCClientConner interface { + ClientConn(datacenter string) (*grpc.ClientConn, error) } diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index d1888911d8..851b14aa07 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -5,14 +5,13 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" - agentgrpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/rpc/subscribe" "github.com/hashicorp/consul/agent/structs" ) type subscribeBackend struct { srv *Server - connPool *agentgrpc.ClientConnPool + connPool GRPCClientConner } // TODO: refactor Resolve methods to an ACLBackend that can be used by all diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index cdfccd39f2..d8d5c65f20 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,13 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress - args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) - - if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" { - return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"} - } - out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) if err != nil { return nil, err diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index c4496b6d3f..1c9067f504 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -352,10 +352,7 @@ func testManager_BasicLifecycle( require := require.New(t) logger := testutil.Logger(t) state := local.NewState(local.Config{}, logger, &token.Store{}) - source := &structs.QuerySource{ - Node: "node1", - Datacenter: "dc1", - } + source := &structs.QuerySource{Datacenter: "dc1"} // Stub state syncing state.TriggerSyncChanges = func() {} diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 1ef29488ff..b8b023e774 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -5,16 +5,18 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" + "github.com/hashicorp/consul/proto/pbsubscribe" ) +// Client provides access to service health data. type Client struct { - NetRPC NetRPC - Cache CacheGetter - // CacheName to use for service health. - CacheName string - // CacheNameIngress is the name of the cache type to use for ingress - // service health. - CacheNameIngress string + NetRPC NetRPC + Cache CacheGetter + ViewStore MaterializedViewStore + MaterializerDeps MaterializerDeps + CacheName string + UseStreamingBackend bool } type NetRPC interface { @@ -26,10 +28,23 @@ type CacheGetter interface { Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error } +type MaterializedViewStore interface { + Get(ctx context.Context, req submatview.Request) (submatview.Result, error) + Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error +} + func (c *Client) ServiceNodes( ctx context.Context, req structs.ServiceSpecificRequest, ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { + if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) { + result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req)) + if err != nil { + return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err + } + return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err + } + out, md, err := c.getServiceNodes(ctx, req) if err != nil { return out, md, err @@ -50,18 +65,12 @@ func (c *Client) getServiceNodes( req structs.ServiceSpecificRequest, ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { var out structs.IndexedCheckServiceNodes - if !req.QueryOptions.UseCache { err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) return out, cache.ResultMeta{}, err } - cacheName := c.CacheName - if req.Ingress { - cacheName = c.CacheNameIngress - } - - raw, md, err := c.Cache.Get(ctx, cacheName, &req) + raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) if err != nil { return out, md, err } @@ -80,9 +89,55 @@ func (c *Client) Notify( correlationID string, ch chan<- cache.UpdateEvent, ) error { - cacheName := c.CacheName - if req.Ingress { - cacheName = c.CacheNameIngress + if c.useStreaming(req) { + sr := c.newServiceRequest(req) + return c.ViewStore.Notify(ctx, sr, correlationID, ch) } - return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch) + + return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) +} + +func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { + return c.UseStreamingBackend && !req.Ingress && req.Source.Node == "" +} + +func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest { + return serviceRequest{ + ServiceSpecificRequest: req, + deps: c.MaterializerDeps, + } +} + +// Close any underlying connections used by the client. +func (c *Client) Close() error { + if c == nil { + return nil + } + return c.MaterializerDeps.Conn.Close() +} + +type serviceRequest struct { + structs.ServiceSpecificRequest + deps MaterializerDeps +} + +func (r serviceRequest) CacheInfo() cache.RequestInfo { + return r.ServiceSpecificRequest.CacheInfo() +} + +func (r serviceRequest) Type() string { + return "service-health" +} + +func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) { + view, err := newHealthView(r.ServiceSpecificRequest) + if err != nil { + return nil, err + } + return submatview.NewMaterializer(submatview.Deps{ + View: view, + Client: pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), + Logger: r.deps.Logger, + Request: newMaterializerRequest(r.ServiceSpecificRequest), + }), nil } diff --git a/agent/rpcclient/health/health_test.go b/agent/rpcclient/health/health_test.go new file mode 100644 index 0000000000..09da967bde --- /dev/null +++ b/agent/rpcclient/health/health_test.go @@ -0,0 +1,235 @@ +package health + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" +) + +func TestClient_ServiceNodes_BackendRouting(t *testing.T) { + type testCase struct { + name string + req structs.ServiceSpecificRequest + expected func(t *testing.T, c *Client) + } + + run := func(t *testing.T, tc testCase) { + c := &Client{ + NetRPC: &fakeNetRPC{}, + Cache: &fakeCache{}, + ViewStore: &fakeViewStore{}, + CacheName: "cache-no-streaming", + UseStreamingBackend: true, + } + + _, _, err := c.ServiceNodes(context.Background(), tc.req) + require.NoError(t, err) + tc.expected(t, c) + } + + var testCases = []testCase{ + { + name: "rpc by default", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + }, + expected: useRPC, + }, + { + name: "use streaming instead of cache", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + }, + expected: useStreaming, + }, + { + name: "use streaming for MinQueryIndex", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, + }, + expected: useStreaming, + }, + { + name: "use cache for ingress request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + Ingress: true, + }, + expected: useCache, + }, + { + name: "use cache for near request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + QueryOptions: structs.QueryOptions{UseCache: true}, + Source: structs.QuerySource{Node: "node1"}, + }, + expected: useCache, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func useRPC(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, cache.calls, 0) + require.Len(t, store.calls, 0) + require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls) +} + +func useStreaming(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, cache.calls, 0) + require.Len(t, rpc.calls, 0) + require.Len(t, store.calls, 1) +} + +func useCache(t *testing.T, c *Client) { + t.Helper() + + rpc, ok := c.NetRPC.(*fakeNetRPC) + require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC) + + cache, ok := c.Cache.(*fakeCache) + require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache) + + store, ok := c.ViewStore.(*fakeViewStore) + require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore) + + require.Len(t, rpc.calls, 0) + require.Len(t, store.calls, 0) + require.Equal(t, []string{"cache-no-streaming"}, cache.calls) +} + +type fakeCache struct { + calls []string +} + +func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) { + f.calls = append(f.calls, t) + result := &structs.IndexedCheckServiceNodes{} + return result, cache.ResultMeta{}, nil +} + +func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error { + f.calls = append(f.calls, t) + return nil +} + +type fakeNetRPC struct { + calls []string +} + +func (f *fakeNetRPC) RPC(method string, _ interface{}, _ interface{}) error { + f.calls = append(f.calls, method) + return nil +} + +type fakeViewStore struct { + calls []submatview.Request +} + +func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) { + f.calls = append(f.calls, req) + return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil +} + +func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error { + f.calls = append(f.calls, req) + return nil +} + +func TestClient_Notify_BackendRouting(t *testing.T) { + type testCase struct { + name string + req structs.ServiceSpecificRequest + expected func(t *testing.T, c *Client) + } + + run := func(t *testing.T, tc testCase) { + c := &Client{ + NetRPC: &fakeNetRPC{}, + Cache: &fakeCache{}, + ViewStore: &fakeViewStore{}, + CacheName: "cache-no-streaming", + UseStreamingBackend: true, + } + + err := c.Notify(context.Background(), tc.req, "cid", nil) + require.NoError(t, err) + tc.expected(t, c) + } + + var testCases = []testCase{ + { + name: "streaming by default", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + }, + expected: useStreaming, + }, + { + name: "use cache for ingress request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + Ingress: true, + }, + expected: useCache, + }, + { + name: "use cache for near request", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + Source: structs.QuerySource{Node: "node1"}, + }, + expected: useCache, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/agent/rpcclient/health/streaming_test.go b/agent/rpcclient/health/streaming_test.go new file mode 100644 index 0000000000..a55fac25b4 --- /dev/null +++ b/agent/rpcclient/health/streaming_test.go @@ -0,0 +1,69 @@ +package health + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +// streamClient is a mock StreamingClient for testing that allows +// for queueing up custom events to a subscriber. +type streamClient struct { + pbsubscribe.StateChangeSubscription_SubscribeClient + subFn func(*pbsubscribe.SubscribeRequest) error + events chan eventOrErr + ctx context.Context +} + +type eventOrErr struct { + Err error + Event *pbsubscribe.Event +} + +func newStreamClient(sub func(req *pbsubscribe.SubscribeRequest) error) *streamClient { + if sub == nil { + sub = func(*pbsubscribe.SubscribeRequest) error { + return nil + } + } + return &streamClient{ + events: make(chan eventOrErr, 32), + subFn: sub, + } +} + +func (t *streamClient) Subscribe( + ctx context.Context, + req *pbsubscribe.SubscribeRequest, + _ ...grpc.CallOption, +) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { + if err := t.subFn(req); err != nil { + return nil, err + } + t.ctx = ctx + return t, nil +} + +func (t *streamClient) QueueEvents(events ...*pbsubscribe.Event) { + for _, e := range events { + t.events <- eventOrErr{Event: e} + } +} + +func (t *streamClient) QueueErr(err error) { + t.events <- eventOrErr{Err: err} +} + +func (t *streamClient) Recv() (*pbsubscribe.Event, error) { + select { + case eoe := <-t.events: + if eoe.Err != nil { + return nil, eoe.Err + } + return eoe.Event, nil + case <-t.ctx.Done(): + return nil, t.ctx.Err() + } +} diff --git a/agent/cache-types/streaming_health_services.go b/agent/rpcclient/health/view.go similarity index 56% rename from agent/cache-types/streaming_health_services.go rename to agent/rpcclient/health/view.go index 8305c54e5c..a648686c45 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/rpcclient/health/view.go @@ -1,74 +1,27 @@ -package cachetype +package health import ( - "context" "fmt" "reflect" "sort" "strings" - "time" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/agent/submatview" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" ) -const ( - // Recommended name for registration. - StreamingHealthServicesName = "streaming-health-services" -) - -// StreamingHealthServices supports fetching discovering service instances via the -// catalog using the streaming gRPC endpoint. -type StreamingHealthServices struct { - RegisterOptionsBlockingRefresh - deps MaterializerDeps -} - -// RegisterOptions returns options with a much shorter LastGetTTL than the default. -// Unlike other cache-types, StreamingHealthServices runs a materialized view in -// the background which will receive streamed events from a server. If the cache -// is not being used, that stream uses memory on the server and network transfer -// between the client and the server. -// The materialize view and the stream are stopped when the cache entry expires, -// so using a shorter TTL ensures the cache entry expires sooner. -func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions { - opts := c.RegisterOptionsBlockingRefresh.RegisterOptions() - opts.LastGetTTL = 20 * time.Minute - return opts -} - -// NewStreamingHealthServices creates a cache-type for watching for service -// health results via streaming updates. -func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices { - return &StreamingHealthServices{deps: deps} -} - type MaterializerDeps struct { - Client submatview.StreamClient + Conn *grpc.ClientConn Logger hclog.Logger } -// Fetch service health from the materialized view. If no materialized view -// exists, create one and start it running in a goroutine. The goroutine will -// exit when the cache entry storing the result is expired, the cache will call -// Close on the result.State. -// -// Fetch implements part of the cache.Type interface, and assumes that the -// caller ensures that only a single call to Fetch is running at any time. -func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { - if opts.LastResult != nil && opts.LastResult.State != nil { - return opts.LastResult.State.(*streamingHealthState).Fetch(opts) - } - - srvReq := req.(*structs.ServiceSpecificRequest) - newReqFn := func(index uint64) pbsubscribe.SubscribeRequest { +func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest { + return func(index uint64) pbsubscribe.SubscribeRequest { req := pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: srvReq.ServiceName, @@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque } return req } - - materializer, err := newMaterializer(c.deps, newReqFn, srvReq) - if err != nil { - return cache.FetchResult{}, err - } - ctx, cancel := context.WithCancel(context.TODO()) - go materializer.Run(ctx) - - state := &streamingHealthState{ - materializer: materializer, - done: ctx.Done(), - cancel: cancel, - } - return state.Fetch(opts) } -func newMaterializer( - deps MaterializerDeps, - newRequestFn func(uint64) pbsubscribe.SubscribeRequest, - req *structs.ServiceSpecificRequest, -) (*submatview.Materializer, error) { - view, err := newHealthView(req) - if err != nil { - return nil, err - } - return submatview.NewMaterializer(submatview.Deps{ - View: view, - Client: deps.Client, - Logger: deps.Logger, - Waiter: &retry.Waiter{ - MinFailures: 1, - // Start backing off with small increments (200-400ms) which will double - // each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000 - // after that). (retry.Wait applies Max limit after jitter right now). - Factor: 200 * time.Millisecond, - MinWait: 0, - MaxWait: 60 * time.Second, - Jitter: retry.NewJitter(100), - }, - Request: newRequestFn, - }), nil -} - -// streamingHealthState wraps a Materializer to manage its lifecycle, and to -// add itself to the FetchResult.State. -type streamingHealthState struct { - materializer *submatview.Materializer - done <-chan struct{} - cancel func() -} - -func (s *streamingHealthState) Close() error { - s.cancel() - return nil -} - -func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) { - result, err := s.materializer.Fetch(s.done, opts) - result.State = s - return result, err -} - -func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) { +func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) { fe, err := newFilterEvaluator(req) if err != nil { return nil, err @@ -197,7 +90,7 @@ type filterEvaluator interface { Evaluate(datum interface{}) (bool, error) } -func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) { +func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) { var evaluators []filterEvaluator typ := reflect.TypeOf(structs.CheckServiceNode{}) @@ -274,7 +167,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) { } // Result returns the structs.IndexedCheckServiceNodes stored by this view. -func (s *healthView) Result(index uint64) (interface{}, error) { +func (s *healthView) Result(index uint64) interface{} { result := structs.IndexedCheckServiceNodes{ Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), QueryMeta: structs.QueryMeta{ @@ -286,7 +179,7 @@ func (s *healthView) Result(index uint64) (interface{}, error) { } sortCheckServiceNodes(&result) - return &result, nil + return &result } func (s *healthView) Reset() { diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/rpcclient/health/view_test.go similarity index 54% rename from agent/cache-types/streaming_health_services_test.go rename to agent/rpcclient/health/view_test.go index 9d7193a424..bdc59ad520 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/rpcclient/health/view_test.go @@ -1,238 +1,29 @@ -package cachetype +package health import ( + "context" "errors" "fmt" "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" ) -func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - namespace := pbcommon.DefaultEnterpriseMeta.Namespace - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} - - // Initially there are no services registered. Server should send an - // EndOfSnapshot message immediately with index of 1. - client.QueueEvents(newEndOfSnapshotEvent(1)) - - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - empty := &structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{}, - QueryMeta: structs.QueryMeta{ - Index: 1, - }, - } - - runStep(t, "empty snapshot returned", func(t *testing.T) { - // Fetch should return an empty - // result of the right type with a non-zero index, and in the background begin - // streaming updates. - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - - require.Equal(t, uint64(1), result.Index) - require.Equal(t, empty, result.Value) - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "blocks for timeout", func(t *testing.T) { - // Subsequent fetch should block for the timeout - start := time.Now() - opts.Timeout = 200 * time.Millisecond - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - require.Equal(t, empty, result.Value, "result value should not have changed") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "blocks until update", func(t *testing.T) { - // Make another blocking query with a longer timeout and trigger an update - // event part way through. - start := time.Now() - go func() { - time.Sleep(200 * time.Millisecond) - client.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) - }() - - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until the event was delivered") - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, uint64(4), result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { - client.QueueErr(tempError("broken pipe")) - - // Next fetch will continue to block until timeout and receive the same - // result. - start := time.Now() - opts.Timeout = 200 * time.Millisecond - result, err := typ.Fetch(opts, req) - require.NoError(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed") - - opts.MinIndex = result.Index - opts.LastResult = &result - - // But an update should still be noticed due to reconnection - client.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) - - start = time.Now() - opts.Timeout = time.Second - result, err = typ.Fetch(opts, req) - require.NoError(t, err) - elapsed = time.Since(start) - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, uint64(10), result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) - - runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { - // Wait and send the error while fetcher is waiting - go func() { - time.Sleep(200 * time.Millisecond) - client.QueueErr(errors.New("invalid request")) - }() - - // Next fetch should return the error - start := time.Now() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) - require.Error(t, err) - elapsed := time.Since(start) - require.True(t, elapsed >= 200*time.Millisecond, - "Fetch should have blocked until error was sent") - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed") - // We don't require instances to be returned in same order so we use - // elementsMatch which is recursive. - requireResultsSame(t, - opts.LastResult.Value.(*structs.IndexedCheckServiceNodes), - result.Value.(*structs.IndexedCheckServiceNodes), - ) - - opts.MinIndex = result.Index - opts.LastResult = &result - - // But an update should still be noticed due to reconnection - client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web")) - - opts.Timeout = time.Second - result, err = typ.Fetch(opts, req) - require.NoError(t, err) - elapsed = time.Since(start) - require.True(t, elapsed < time.Second, - "Fetch should have returned before the timeout") - - require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, - "result value should contain the new registration") - - opts.MinIndex = result.Index - opts.LastResult = &result - }) -} - -type tempError string - -func (e tempError) Error() string { - return string(e) -} - -func (e tempError) Temporary() bool { - return true -} - -// requireResultsSame compares two IndexedCheckServiceNodes without requiring -// the same order of results (which vary due to map usage internally). -func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) { - require.Equal(t, want.Index, got.Index) - - svcIDs := func(csns structs.CheckServiceNodes) []string { - res := make([]string, 0, len(csns)) - for _, csn := range csns { - res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID)) - } - return res - } - - gotIDs := svcIDs(got.Nodes) - wantIDs := svcIDs(want.Nodes) - - require.ElementsMatch(t, wantIDs, gotIDs) -} - -// getNamespace returns a namespace if namespace support exists, otherwise -// returns the empty string. It allows the same tests to work in both oss and ent -// without duplicating the tests. -func getNamespace(ns string) string { - meta := structs.NewEnterpriseMeta(ns) - return meta.NamespaceOrEmpty() -} - -func TestOrderingConsistentWithMemDb(t *testing.T) { +func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) { index := uint64(42) buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { newID, err := uuid.GenerateUUID() @@ -270,29 +61,205 @@ func TestOrderingConsistentWithMemDb(t *testing.T) { two := buildTestNode("node1", "testService:2") three := buildTestNode("node2", "testService") result := structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{ - three, two, zero, one, - }, - QueryMeta: structs.QueryMeta{ - Index: index, - }, + Nodes: structs.CheckServiceNodes{three, two, zero, one}, + QueryMeta: structs.QueryMeta{Index: index}, } sortCheckServiceNodes(&result) expected := structs.CheckServiceNodes{zero, one, two, three} require.Equal(t, expected, result.Nodes) } -func TestStreamingHealthServices_FullSnapshot(t *testing.T) { +func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) + streamClient := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) + go store.Run(ctx) + + // Initially there are no services registered. Server should send an + // EndOfSnapshot message immediately with index of 1. + streamClient.QueueEvents(newEndOfSnapshotEvent(1)) + + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: streamClient, + } + empty := &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{}, + QueryMeta: structs.QueryMeta{ + Index: 1, + }, + } + + runStep(t, "empty snapshot returned", func(t *testing.T) { + result, err := store.Get(ctx, req) + require.NoError(t, err) + + require.Equal(t, uint64(1), result.Index) + require.Equal(t, empty, result.Value) + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "blocks for timeout", func(t *testing.T) { + // Subsequent fetch should block for the timeout + start := time.Now() + req.QueryOptions.MaxQueryTime = 200 * time.Millisecond + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") + require.Equal(t, empty, result.Value, "result value should not have changed") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + var lastResultValue structs.CheckServiceNodes + + runStep(t, "blocks until update", func(t *testing.T) { + // Make another blocking query with a longer timeout and trigger an update + // event part way through. + start := time.Now() + go func() { + time.Sleep(200 * time.Millisecond) + streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) + }() + + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until the event was delivered") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(4), result.Index, "result index should not have changed") + lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes + require.Len(t, lastResultValue, 1, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { + streamClient.QueueErr(tempError("broken pipe")) + + // Next fetch will continue to block until timeout and receive the same + // result. + start := time.Now() + req.QueryOptions.MaxQueryTime = 200 * time.Millisecond + result, err := store.Get(ctx, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, + "result index should not have changed") + require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, + "result value should not have changed") + + req.QueryOptions.MinQueryIndex = result.Index + + // But an update should still be noticed due to reconnection + streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) + + start = time.Now() + req.QueryOptions.MaxQueryTime = time.Second + result, err = store.Get(ctx, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(10), result.Index, "result index should not have changed") + lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes + require.Len(t, lastResultValue, 2, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) + + runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { + // Wait and send the error while fetcher is waiting + go func() { + time.Sleep(200 * time.Millisecond) + streamClient.QueueErr(errors.New("invalid request")) + }() + + // Next fetch should return the error + start := time.Now() + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) + require.Error(t, err) + elapsed := time.Since(start) + require.True(t, elapsed >= 200*time.Millisecond, + "Fetch should have blocked until error was sent") + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") + require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes) + + req.QueryOptions.MinQueryIndex = result.Index + + // But an update should still be noticed due to reconnection + streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web")) + + req.QueryOptions.MaxQueryTime = time.Second + result, err = store.Get(ctx, req) + require.NoError(t, err) + elapsed = time.Since(start) + require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") + + require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") + require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, + "result value should contain the new registration") + + req.QueryOptions.MinQueryIndex = result.Index + }) +} + +type tempError string + +func (e tempError) Error() string { + return string(e) +} + +func (e tempError) Temporary() bool { + return true +} + +func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } namespace := getNamespace("ns2") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + client := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) // Create an initial snapshot of 3 instances on different nodes registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { @@ -304,37 +271,28 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(5, 3), newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - // Result will be sorted alphabetically the same way as memdb - return nodes + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: client, } runStep(t, "full snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.ElementsMatch(t, []string{"node1", "node2", "node3"}, - gatherNodes(result.Value)) + expected := newExpectedNodes("node1", "node2", "node3") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "blocks until deregistration", func(t *testing.T) { @@ -348,8 +306,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) }() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed >= 200*time.Millisecond, @@ -358,10 +316,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { "Fetch should have returned before the timeout") require.Equal(t, uint64(20), result.Index) - require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node2", "node3") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "server reload is respected", func(t *testing.T) { @@ -379,18 +338,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { // Make another blocking query with THE SAME index. It should immediately // return the new snapshot. start := time.Now() - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node3", "node4", "node5") + expected.Index = 50 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { @@ -404,26 +364,54 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { newEndOfSnapshotEvent(50)) start := time.Now() - opts.MinIndex = 49 - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MinQueryIndex = 49 + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) elapsed := time.Since(start) require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node3", "node4", "node5") + expected.Index = 50 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) } -func TestStreamingHealthServices_EventBatches(t *testing.T) { +func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { + result := &structs.IndexedCheckServiceNodes{} + for _, node := range nodes { + result.Nodes = append(result.Nodes, structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + }) + } + return result +} + +// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode +// by Node name. +var cmpCheckServiceNodeNames = cmp.Options{ + cmp.Comparer(func(x, y structs.CheckServiceNode) bool { + return x.Node.Node == y.Node.Node + }), +} + +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { + t.Helper() + if diff := cmp.Diff(x, y, opts...); diff != "" { + t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) + } +} + +func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { namespace := getNamespace("ns3") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + client := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( @@ -434,36 +422,28 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { batchEv, newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - return nodes + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, + }, + }, + streamClient: client, } runStep(t, "full snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.ElementsMatch(t, []string{"node1", "node2", "node3"}, - gatherNodes(result.Value)) - opts.MinIndex = result.Index - opts.LastResult = &result + expected := newExpectedNodes("node1", "node2", "node3") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "batched updates work too", func(t *testing.T) { @@ -476,92 +456,220 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { newEventServiceHealthRegister(20, 4, "web"), ) client.QueueEvents(batchEv) - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + req.QueryOptions.MaxQueryTime = time.Second + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - require.ElementsMatch(t, []string{"node2", "node3", "node4"}, - gatherNodes(result.Value)) + expected := newExpectedNodes("node2", "node3", "node4") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) } -func TestStreamingHealthServices_Filtering(t *testing.T) { +func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { namespace := getNamespace("ns3") - client := NewTestStreamingClient(namespace) - typ := StreamingHealthServices{deps: MaterializerDeps{ - Client: client, - Logger: hclog.Default(), - }} + streamClient := newStreamClient(validateNamespace(namespace)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := submatview.NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := serviceRequestStub{ + serviceRequest: serviceRequest{ + ServiceSpecificRequest: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.NewEnterpriseMeta(namespace), + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node2"`, + MaxQueryTime: time.Second, + }, + }, + }, + streamClient: streamClient, + } // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( newEventServiceHealthRegister(5, 1, "web"), newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 3, "web")) - client.QueueEvents( + streamClient.QueueEvents( batchEv, newEndOfSnapshotEvent(5)) - // This contains the view state so important we share it between calls. - opts := cache.FetchOptions{ - MinIndex: 0, - Timeout: 1 * time.Second, - } - req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - EnterpriseMeta: structs.NewEnterpriseMeta(namespace), - QueryOptions: structs.QueryOptions{ - Filter: `Node.Node == "node2"`, - }, - } - - gatherNodes := func(res interface{}) []string { - nodes := make([]string, 0, 3) - r := res.(*structs.IndexedCheckServiceNodes) - for _, csn := range r.Nodes { - nodes = append(nodes, csn.Node.Node) - } - return nodes - } - runStep(t, "filtered snapshot returned", func(t *testing.T) { - result, err := typ.Fetch(opts, req) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) + expected := newExpectedNodes("node2") + expected.Index = 5 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) - opts.MinIndex = result.Index - opts.LastResult = &result + req.QueryOptions.MinQueryIndex = result.Index }) runStep(t, "filtered updates work too", func(t *testing.T) { - // Simulate multiple registrations happening in one Txn (so all have same - // index) + // Simulate multiple registrations happening in one Txn (all have same index) batchEv := newEventBatchWithEvents( // Deregister an existing node newEventServiceHealthDeregister(20, 1, "web"), // Register another newEventServiceHealthRegister(20, 4, "web"), ) - client.QueueEvents(batchEv) - opts.Timeout = time.Second - result, err := typ.Fetch(opts, req) + streamClient.QueueEvents(batchEv) + result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) - - opts.MinIndex = result.Index - opts.LastResult = &result + expected := newExpectedNodes("node2") + expected.Index = 20 + assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) } +// serviceRequestStub overrides NewMaterializer so that test can use a fake +// StreamClient. +type serviceRequestStub struct { + serviceRequest + streamClient submatview.StreamClient +} + +func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) { + view, err := newHealthView(r.ServiceSpecificRequest) + if err != nil { + return nil, err + } + return submatview.NewMaterializer(submatview.Deps{ + View: view, + Client: r.streamClient, + Logger: hclog.New(nil), + Request: newMaterializerRequest(r.ServiceSpecificRequest), + }), nil +} + +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + } +} + +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: node, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: pbcommon.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + }, + } +} + +func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { + events := make([]*pbsubscribe.Event, len(evs)+1) + events[0] = first + for i := range evs { + events[i+1] = evs[i] + } + return &pbsubscribe.Event{ + Index: first.Index, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{Events: events}, + }, + } +} + +func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + } +} + +func newNewSnapshotToFollowEvent() *pbsubscribe.Event { + return &pbsubscribe.Event{ + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, + } +} + +// getNamespace returns a namespace if namespace support exists, otherwise +// returns the empty string. It allows the same tests to work in both oss and ent +// without duplicating the tests. +func getNamespace(ns string) string { + meta := structs.NewEnterpriseMeta(ns) + return meta.NamespaceOrEmpty() +} + +func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error { + return func(request *pbsubscribe.SubscribeRequest) error { + if request.Namespace != ns { + return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace) + } + return nil + } +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) { @@ -578,7 +686,7 @@ func TestNewFilterEvaluator(t *testing.T) { } fn := func(t *testing.T, tc testCase) { - e, err := newFilterEvaluator(&tc.req) + e, err := newFilterEvaluator(tc.req) require.NoError(t, err) actual, err := e.Evaluate(tc.data) require.NoError(t, err) diff --git a/agent/setup.go b/agent/setup.go index 9a84e7d247..291c26862a 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -8,31 +8,27 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/consul/fsm" - "github.com/armon/go-metrics/prometheus" - - "github.com/hashicorp/consul/agent/consul/usagemetrics" - "github.com/hashicorp/consul/agent/local" - "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc/resolver" + "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" + "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" ) @@ -46,6 +42,7 @@ type BaseDeps struct { MetricsHandler MetricsHandler AutoConfig *autoconf.AutoConfig // TODO: use an interface Cache *cache.Cache + ViewStore *submatview.Store } // MetricsHandler provides an http.Handler for displaying metrics. @@ -69,7 +66,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) if err != nil { return d, err } - grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger)) + + grpcLogInitOnce.Do(func() { + grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger)) + }) for _, w := range result.Warnings { d.Logger.Warn(w) @@ -100,6 +100,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) cfg.Cache.Logger = d.Logger.Named("cache") // cache-types are not registered yet, but they won't be used until the components are started. d.Cache = cache.New(cfg.Cache) + d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore")) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) @@ -122,32 +123,12 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) return d, err } - if err := registerCacheTypes(d); err != nil { - return d, err - } - return d, nil } -// registerCacheTypes on bd.Cache. -// -// Note: most cache types are still registered in Agent.registerCache. This -// function is for registering newer cache-types which no longer have a dependency -// on Agent. -func registerCacheTypes(bd BaseDeps) error { - if bd.RuntimeConfig.UseStreamingBackend { - conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) - if err != nil { - return err - } - matDeps := cachetype.MaterializerDeps{ - Client: pbsubscribe.NewStateChangeSubscriptionClient(conn), - Logger: bd.Logger, - } - bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps)) - } - return nil -} +// grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and +// causes data races when it is re-initialized. +var grpcLogInitOnce sync.Once func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { var rpcSrcAddr *net.TCPAddr diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index b5a5157541..51402987dc 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -10,7 +10,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/proto/pbsubscribe" ) @@ -32,7 +31,7 @@ type View interface { // separately and passed in in case the return type needs an Index field // populating. This allows implementations to not worry about maintaining // indexes seen during Update. - Result(index uint64) (interface{}, error) + Result(index uint64) interface{} // Reset the view to the zero state, done in preparation for receiving a new // snapshot. @@ -80,6 +79,18 @@ func NewMaterializer(deps Deps) *Materializer { retryWaiter: deps.Waiter, updateCh: make(chan struct{}), } + if v.retryWaiter == nil { + v.retryWaiter = &retry.Waiter{ + MinFailures: 1, + // Start backing off with small increments (200-400ms) which will double + // each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000 + // after that). (retry.Wait applies Max limit after jitter right now). + Factor: 200 * time.Millisecond, + MinWait: 0, + MaxWait: 60 * time.Second, + Jitter: retry.NewJitter(100), + } + } return v } @@ -204,81 +215,61 @@ func (m *Materializer) notifyUpdateLocked(err error) { m.updateCh = make(chan struct{}) } -// Fetch implements the logic a StreamingCacheType will need during it's Fetch -// call. Cache types that use streaming should just be able to proxy to this -// once they have a subscription object and return it's results directly. -func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) { - var result cache.FetchResult +type Result struct { + Index uint64 + Value interface{} +} - // Get current view Result and index +// getFromView blocks until the index of the View is greater than opts.MinIndex, +//or the context is cancelled. +func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result, error) { m.lock.Lock() - index := m.index - val, err := m.view.Result(m.index) + + result := Result{ + Index: m.index, + Value: m.view.Result(m.index), + } + updateCh := m.updateCh m.lock.Unlock() - if err != nil { - return result, err - } - - result.Index = index - result.Value = val - // If our index is > req.Index return right away. If index is zero then we // haven't loaded a snapshot at all yet which means we should wait for one on - // the update chan. Note it's opts.MinIndex that the cache is using here the - // request min index might be different and from initial user request. - if index > 0 && index > opts.MinIndex { + // the update chan. + if result.Index > 0 && result.Index > minIndex { return result, nil } - // Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout - // since that is the timeout the client requested from the cache Get while the - // options one is the internal "background refresh" timeout which is what the - // Fetch call should be using. - timeoutCh := time.After(opts.Timeout) for { select { case <-updateCh: // View updated, return the new result m.lock.Lock() result.Index = m.index - // Grab the new updateCh in case we need to keep waiting for the next - // update. - updateCh = m.updateCh - fetchErr := m.err - if fetchErr == nil { - // Only generate a new result if there was no error to avoid pointless - // work potentially shuffling the same data around. - result.Value, err = m.view.Result(m.index) - } - m.lock.Unlock() - // If there was a non-transient error return it - if fetchErr != nil { - return result, fetchErr - } - if err != nil { + switch { + case m.err != nil: + err := m.err + m.lock.Unlock() return result, err - } - - // Sanity check the update is actually later than the one the user - // requested. - if result.Index <= opts.MinIndex { - // The result is still older/same as the requested index, continue to - // wait for further updates. + case result.Index <= minIndex: + // get a reference to the new updateCh, the previous one was closed + updateCh = m.updateCh + m.lock.Unlock() continue } - // Return the updated result + result.Value = m.view.Result(m.index) + m.lock.Unlock() return result, nil - case <-timeoutCh: - // Just return whatever we got originally, might still be empty - return result, nil - - case <-done: - return result, context.Canceled + case <-ctx.Done(): + // Update the result value to the latest because callers may still + // use the value when the error is context.DeadlineExceeded + m.lock.Lock() + result.Value = m.view.Result(m.index) + m.lock.Unlock() + return result, ctx.Err() } } } diff --git a/agent/submatview/store.go b/agent/submatview/store.go new file mode 100644 index 0000000000..cf99857089 --- /dev/null +++ b/agent/submatview/store.go @@ -0,0 +1,240 @@ +package submatview + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/lib/ttlcache" +) + +// Store of Materializers. Store implements an interface similar to +// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests +// as long as the requests are identical. +// Store is used in place of agent/cache.Cache because with the streaming +// backend there is no longer any need to run a background goroutine to refresh +// stored values. +type Store struct { + logger hclog.Logger + lock sync.RWMutex + byKey map[string]entry + + // expiryHeap tracks entries with 0 remaining requests. Entries are ordered + // by most recent expiry first. + expiryHeap *ttlcache.ExpiryHeap + + // idleTTL is the duration of time an entry should remain in the Store after the + // last request for that entry has been terminated. It is a field on the struct + // so that it can be patched in tests without need a lock. + idleTTL time.Duration +} + +type entry struct { + materializer *Materializer + expiry *ttlcache.Entry + stop func() + // requests is the count of active requests using this entry. This entry will + // remain in the store as long as this count remains > 0. + requests int +} + +// NewStore creates and returns a Store that is ready for use. The caller must +// call Store.Run (likely in a separate goroutine) to start the expiration loop. +func NewStore(logger hclog.Logger) *Store { + return &Store{ + logger: logger, + byKey: make(map[string]entry), + expiryHeap: ttlcache.NewExpiryHeap(), + idleTTL: 20 * time.Minute, + } +} + +// Run the expiration loop until the context is cancelled. +func (s *Store) Run(ctx context.Context) { + for { + s.lock.RLock() + timer := s.expiryHeap.Next() + s.lock.RUnlock() + + select { + case <-ctx.Done(): + timer.Stop() + return + + // the first item in the heap has changed, restart the timer with the + // new TTL. + case <-s.expiryHeap.NotifyCh: + timer.Stop() + continue + + // the TTL for the first item has been reached, attempt an expiration. + case <-timer.Wait(): + s.lock.Lock() + + he := timer.Entry + s.expiryHeap.Remove(he.Index()) + + e := s.byKey[he.Key()] + + // Only stop the materializer if there are no active requests. + if e.requests == 0 { + e.stop() + delete(s.byKey, he.Key()) + } + + s.lock.Unlock() + } + } +} + +// Request is used to request data from the Store. +// Note that cache.Request is required, but some of the fields cache.RequestInfo +// fields are ignored (ex: MaxAge, and MustRevalidate). +type Request interface { + cache.Request + // NewMaterializer will be called if there is no active materializer to fulfil + // the request. It should return a Materializer appropriate for streaming + // data to fulfil this request. + NewMaterializer() (*Materializer, error) + // Type should return a string which uniquely identifies this type of request. + // The returned value is used as the prefix of the key used to index + // entries in the Store. + Type() string +} + +// Get a value from the store, blocking if the store has not yet seen the +// req.Index value. +// See agent/cache.Cache.Get for complete documentation. +func (s *Store) Get(ctx context.Context, req Request) (Result, error) { + info := req.CacheInfo() + key, materializer, err := s.readEntry(req) + if err != nil { + return Result{}, err + } + defer s.releaseEntry(key) + + ctx, cancel := context.WithTimeout(ctx, info.Timeout) + defer cancel() + + result, err := materializer.getFromView(ctx, info.MinIndex) + // context.DeadlineExceeded is translated to nil to match the behaviour of + // agent/cache.Cache.Get. + if err == nil || errors.Is(err, context.DeadlineExceeded) { + return result, nil + } + return result, err +} + +// Notify the updateCh when there are updates to the entry identified by req. +// See agent/cache.Cache.Notify for complete documentation. +// +// Request.CacheInfo().Timeout is ignored because it is not really relevant in +// this case. Instead set a deadline on the context. +func (s *Store) Notify( + ctx context.Context, + req Request, + correlationID string, + updateCh chan<- cache.UpdateEvent, +) error { + info := req.CacheInfo() + key, materializer, err := s.readEntry(req) + if err != nil { + return err + } + + go func() { + defer s.releaseEntry(key) + + index := info.MinIndex + for { + result, err := materializer.getFromView(ctx, index) + switch { + case ctx.Err() != nil: + return + case err != nil: + s.logger.Warn("handling error in Store.Notify", + "error", err, + "request-type", req.Type(), + "index", index) + continue + } + + index = result.Index + u := cache.UpdateEvent{ + CorrelationID: correlationID, + Result: result.Value, + Meta: cache.ResultMeta{Index: result.Index}, + } + select { + case updateCh <- u: + case <-ctx.Done(): + return + } + } + }() + return nil +} + +// readEntry from the store, and increment the requests counter. releaseEntry +// must be called when the request is finished to decrement the counter. +func (s *Store) readEntry(req Request) (string, *Materializer, error) { + info := req.CacheInfo() + key := makeEntryKey(req.Type(), info) + + s.lock.Lock() + defer s.lock.Unlock() + e, ok := s.byKey[key] + if ok { + e.requests++ + s.byKey[key] = e + return key, e.materializer, nil + } + + mat, err := req.NewMaterializer() + if err != nil { + return "", nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + go mat.Run(ctx) + + e = entry{ + materializer: mat, + stop: cancel, + requests: 1, + } + s.byKey[key] = e + return key, e.materializer, nil +} + +// releaseEntry decrements the request count and starts an expiry timer if the +// count has reached 0. Must be called once for every call to readEntry. +func (s *Store) releaseEntry(key string) { + s.lock.Lock() + defer s.lock.Unlock() + e := s.byKey[key] + e.requests-- + s.byKey[key] = e + + if e.requests > 0 { + return + } + + if e.expiry.Index() == ttlcache.NotIndexed { + e.expiry = s.expiryHeap.Add(key, s.idleTTL) + s.byKey[key] = e + return + } + + s.expiryHeap.Update(e.expiry.Index(), s.idleTTL) +} + +// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. +func makeEntryKey(typ string, r cache.RequestInfo) string { + return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) +} diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go new file mode 100644 index 0000000000..ea0e01c2bd --- /dev/null +++ b/agent/submatview/store_test.go @@ -0,0 +1,458 @@ +package submatview + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/lib/ttlcache" + "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/sdk/testutil/retry" +) + +func TestStore_Get(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents( + newEndOfSnapshotEvent(2), + newEventServiceHealthRegister(10, 1, "srv1"), + newEventServiceHealthRegister(22, 2, "srv1")) + + runStep(t, "from empty store, starts materializer", func(t *testing.T) { + var result Result + retry.Run(t, func(r *retry.R) { + var err error + result, err = store.Get(ctx, req) + require.NoError(r, err) + require.Equal(r, uint64(22), result.Index) + }) + + r, ok := result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(22), r.index) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) + + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) + + runStep(t, "with an index that already exists in the view", func(t *testing.T) { + req.index = 21 + result, err := store.Get(ctx, req) + require.NoError(t, err) + require.Equal(t, uint64(22), result.Index) + + r, ok := result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(22), r.index) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) + + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) + + chResult := make(chan resultOrError, 1) + req.index = 40 + go func() { + result, err := store.Get(ctx, req) + chResult <- resultOrError{Result: result, Err: err} + }() + + runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { + select { + case <-chResult: + t.Fatalf("expected Get to block") + case <-time.After(50 * time.Millisecond): + } + + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + require.Equal(t, 1, e.requests) + }) + + runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) + + select { + case <-chResult: + t.Fatalf("expected Get to block") + case <-time.After(50 * time.Millisecond): + } + + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + require.Equal(t, 1, e.requests) + }) + + runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) { + req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1")) + var getResult resultOrError + select { + case getResult = <-chResult: + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected Get to unblock when new events are received") + } + + require.NoError(t, getResult.Err) + require.Equal(t, uint64(41), getResult.Result.Index) + + r, ok := getResult.Result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(41), r.index) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) + + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) +} + +type resultOrError struct { + Result Result + Err error +} + +type fakeRequest struct { + index uint64 + key string + client *TestStreamingClient +} + +func (r *fakeRequest) CacheInfo() cache.RequestInfo { + key := r.key + if key == "" { + key = "key" + } + return cache.RequestInfo{ + Key: key, + Token: "abcd", + Datacenter: "dc1", + Timeout: 4 * time.Second, + MinIndex: r.index, + } +} + +func (r *fakeRequest) NewMaterializer() (*Materializer, error) { + return NewMaterializer(Deps{ + View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)}, + Client: r.client, + Logger: hclog.New(nil), + Request: func(index uint64) pbsubscribe.SubscribeRequest { + req := pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "key", + Token: "abcd", + Datacenter: "dc1", + Index: index, + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, + } + return req + }, + }), nil +} + +func (r *fakeRequest) Type() string { + return fmt.Sprintf("%T", r) +} + +type fakeView struct { + srvs map[string]*pbservice.CheckServiceNode +} + +func (f *fakeView) Update(events []*pbsubscribe.Event) error { + for _, event := range events { + serviceHealth := event.GetServiceHealth() + if serviceHealth == nil { + return fmt.Errorf("unexpected event type for service health view: %T", + event.GetPayload()) + } + + id := serviceHealth.CheckServiceNode.UniqueID() + switch serviceHealth.Op { + case pbsubscribe.CatalogOp_Register: + f.srvs[id] = serviceHealth.CheckServiceNode + + case pbsubscribe.CatalogOp_Deregister: + delete(f.srvs, id) + } + } + return nil +} + +func (f *fakeView) Result(index uint64) interface{} { + srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs)) + for _, srv := range f.srvs { + srvs = append(srvs, srv) + } + return fakeResult{srvs: srvs, index: index} +} + +type fakeResult struct { + srvs []*pbservice.CheckServiceNode + index uint64 +} + +func (f *fakeView) Reset() { + f.srvs = make(map[string]*pbservice.CheckServiceNode) +} + +func TestStore_Notify(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents( + newEndOfSnapshotEvent(2), + newEventServiceHealthRegister(22, 2, "srv1")) + + cID := "correlate" + ch := make(chan cache.UpdateEvent) + + err := store.Notify(ctx, req, cID, ch) + require.NoError(t, err) + + runStep(t, "from empty store, starts materializer", func(t *testing.T) { + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) + require.Equal(t, 1, e.requests) + }) + + runStep(t, "updates are received", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + select { + case update := <-ch: + require.NoError(r, update.Err) + require.Equal(r, cID, update.CorrelationID) + require.Equal(r, uint64(22), update.Meta.Index) + require.Equal(r, uint64(22), update.Result.(fakeResult).index) + case <-time.After(100 * time.Millisecond): + r.Stop(fmt.Errorf("expected Get to unblock when new events are received")) + } + }) + + req.client.QueueEvents(newEventServiceHealthRegister(24, 2, "srv1")) + + select { + case update := <-ch: + require.NoError(t, update.Err) + require.Equal(t, cID, update.CorrelationID) + require.Equal(t, uint64(24), update.Meta.Index) + require.Equal(t, uint64(24), update.Result.(fakeResult).index) + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected Get to unblock when new events are received") + } + }) + + runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) { + cancel() + + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(r, 0, e.expiry.Index()) + require.Equal(r, 0, e.requests) + require.Equal(r, store.expiryHeap.Next().Entry, e.expiry) + }) + }) +} + +func TestStore_Notify_ManyRequests(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents(newEndOfSnapshotEvent(2)) + + cID := "correlate" + ch1 := make(chan cache.UpdateEvent) + ch2 := make(chan cache.UpdateEvent) + + require.NoError(t, store.Notify(ctx, req, cID, ch1)) + assertRequestCount(t, store, req, 1) + + require.NoError(t, store.Notify(ctx, req, cID, ch2)) + assertRequestCount(t, store, req, 2) + + req.index = 15 + + go func() { + _, _ = store.Get(ctx, req) + }() + + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 3) + }) + + go func() { + _, _ = store.Get(ctx, req) + }() + + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 4) + }) + + var req2 *fakeRequest + + runStep(t, "Get and Notify with a different key", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req2 = &fakeRequest{client: req.client, key: "key2", index: 22} + + require.NoError(t, store.Notify(ctx, req2, cID, ch1)) + go func() { + _, _ = store.Get(ctx, req2) + }() + + // the original entry should still be at count 4 + assertRequestCount(t, store, req, 4) + // the new entry should be at count 2 + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req2, 2) + }) + }) + + runStep(t, "end all the requests", func(t *testing.T) { + req.client.QueueEvents( + newEventServiceHealthRegister(10, 1, "srv1"), + newEventServiceHealthRegister(12, 2, "srv1"), + newEventServiceHealthRegister(13, 1, "srv2"), + newEventServiceHealthRegister(16, 3, "srv2")) + + // The two Get requests should exit now that the index has been updated + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 2) + }) + + // Cancel the context so all requests terminate + cancel() + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 0) + }) + }) + + runStep(t, "the expiry heap should contain two entries", func(t *testing.T) { + store.lock.Lock() + defer store.lock.Unlock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + e2 := store.byKey[makeEntryKey(req2.Type(), req2.CacheInfo())] + require.Equal(t, 0, e2.expiry.Index()) + require.Equal(t, 1, e.expiry.Index()) + + require.Equal(t, store.expiryHeap.Next().Entry, e2.expiry) + }) +} + +type testingT interface { + Helper() + Fatalf(string, ...interface{}) +} + +func assertRequestCount(t testingT, s *Store, req Request, expected int) { + t.Helper() + + key := makeEntryKey(req.Type(), req.CacheInfo()) + + s.lock.Lock() + defer s.lock.Unlock() + actual := s.byKey[key].requests + if actual != expected { + t.Fatalf("expected request count to be %d, got %d", expected, actual) + } +} + +func TestStore_Run_ExpiresEntries(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore(hclog.New(nil)) + ttl := 10 * time.Millisecond + store.idleTTL = ttl + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents(newEndOfSnapshotEvent(2)) + + cID := "correlate" + ch1 := make(chan cache.UpdateEvent) + + reqCtx, reqCancel := context.WithCancel(context.Background()) + defer reqCancel() + + require.NoError(t, store.Notify(reqCtx, req, cID, ch1)) + assertRequestCount(t, store, req, 1) + + // Get a copy of the entry so that we can check it was expired later + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + + reqCancel() + retry.Run(t, func(r *retry.R) { + assertRequestCount(r, store, req, 0) + }) + + // wait for the entry to expire, with lots of buffer + time.Sleep(3 * ttl) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 0) + require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} diff --git a/agent/cache-types/streaming_events_test.go b/agent/submatview/streaming_test.go similarity index 56% rename from agent/cache-types/streaming_events_test.go rename to agent/submatview/streaming_test.go index 272372754d..80fec094fe 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/submatview/streaming_test.go @@ -1,7 +1,11 @@ -package cachetype +package submatview import ( + "context" "fmt" + "sync" + + "google.golang.org/grpc" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" @@ -9,6 +13,84 @@ import ( "github.com/hashicorp/consul/types" ) +// TestStreamingClient is a mock StreamingClient for testing that allows +// for queueing up custom events to a subscriber. +type TestStreamingClient struct { + expectedNamespace string + subClients []*subscribeClient + lock sync.RWMutex + events []eventOrErr +} + +type eventOrErr struct { + Err error + Event *pbsubscribe.Event +} + +func NewTestStreamingClient(ns string) *TestStreamingClient { + return &TestStreamingClient{expectedNamespace: ns} +} + +func (s *TestStreamingClient) Subscribe( + ctx context.Context, + req *pbsubscribe.SubscribeRequest, + _ ...grpc.CallOption, +) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { + if req.Namespace != s.expectedNamespace { + return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", + req.Namespace, s.expectedNamespace) + } + c := &subscribeClient{ + events: make(chan eventOrErr, 32), + ctx: ctx, + } + s.lock.Lock() + s.subClients = append(s.subClients, c) + for _, event := range s.events { + c.events <- event + } + s.lock.Unlock() + return c, nil +} + +type subscribeClient struct { + grpc.ClientStream + events chan eventOrErr + ctx context.Context +} + +func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { + s.lock.Lock() + for _, e := range events { + s.events = append(s.events, eventOrErr{Event: e}) + for _, c := range s.subClients { + c.events <- eventOrErr{Event: e} + } + } + s.lock.Unlock() +} + +func (s *TestStreamingClient) QueueErr(err error) { + s.lock.Lock() + s.events = append(s.events, eventOrErr{Err: err}) + for _, c := range s.subClients { + c.events <- eventOrErr{Err: err} + } + s.lock.Unlock() +} + +func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) { + select { + case eoe := <-c.events: + if eoe.Err != nil { + return nil, eoe.Err + } + return eoe.Event, nil + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { return &pbsubscribe.Event{ Index: index, @@ -22,13 +104,6 @@ func newNewSnapshotToFollowEvent() *pbsubscribe.Event { } } -// newEventServiceHealthRegister returns a realistically populated service -// health registration event for tests. The nodeNum is a -// logical node and is used to create the node name ("node%d") but also change -// the node ID and IP address to make it a little more realistic for cases that -// need that. nodeNum should be less than 64k to make the IP address look -// realistic. Any other changes can be made on the returned event to avoid -// adding too many options to callers. func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) @@ -54,61 +129,17 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub ID: svc, Service: svc, Port: 8080, - Weights: &pbservice.Weights{ - Passing: 1, - Warning: 1, - }, - // Empty sadness - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, - }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, RaftIndex: pbcommon.RaftIndex{ CreateIndex: index, ModifyIndex: index, }, }, - Checks: []*pbservice.HealthCheck{ - { - Node: node, - CheckID: "serf-health", - Name: "serf-health", - Status: "passing", - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - RaftIndex: pbcommon.RaftIndex{ - CreateIndex: index, - ModifyIndex: index, - }, - }, - { - Node: node, - CheckID: types.CheckID("service:" + svc), - Name: "service:" + svc, - ServiceID: svc, - ServiceName: svc, - Type: "ttl", - Status: "passing", - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - RaftIndex: pbcommon.RaftIndex{ - CreateIndex: index, - ModifyIndex: index, - }, - }, - }, }, }, }, } } -// TestEventServiceHealthDeregister returns a realistically populated service -// health deregistration event for tests. The nodeNum is a -// logical node and is used to create the node name ("node%d") but also change -// the node ID and IP address to make it a little more realistic for cases that -// need that. nodeNum should be less than 64k to make the IP address look -// realistic. Any other changes can be made on the returned event to avoid -// adding too many options to callers. func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) @@ -129,12 +160,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs Passing: 1, Warning: 1, }, - // Empty sadness - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, - }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, RaftIndex: pbcommon.RaftIndex{ // The original insertion index since a delete doesn't update // this. This magic value came from state store tests where we diff --git a/sdk/testutil/retry/retry.go b/sdk/testutil/retry/retry.go index 09f845abe9..0b6e1d7071 100644 --- a/sdk/testutil/retry/retry.go +++ b/sdk/testutil/retry/retry.go @@ -34,6 +34,7 @@ type Failer interface { // R provides context for the retryer. type R struct { fail bool + done bool output []string } @@ -77,6 +78,12 @@ func (r *R) log(s string) { r.output = append(r.output, decorate(s)) } +// Stop retrying, and fail the test with the specified error. +func (r *R) Stop(err error) { + r.log(err.Error()) + r.done = true +} + func decorate(s string) string { _, file, line, ok := runtime.Caller(3) if ok { @@ -120,6 +127,7 @@ func dedup(a []string) string { func run(r Retryer, t Failer, f func(r *R)) { t.Helper() rr := &R{} + fail := func() { t.Helper() out := dedup(rr.output) @@ -128,7 +136,8 @@ func run(r Retryer, t Failer, f func(r *R)) { } t.FailNow() } - for r.NextOr(t, fail) { + + for r.Continue() { func() { defer func() { if p := recover(); p != nil && p != runFailed { @@ -137,11 +146,17 @@ func run(r Retryer, t Failer, f func(r *R)) { }() f(rr) }() - if !rr.fail { + + switch { + case rr.done: + fail() + return + case !rr.fail: return } rr.fail = false } + fail() } // DefaultFailer provides default retry.Run() behavior for unit tests. @@ -162,9 +177,9 @@ func ThreeTimes() *Counter { // Retryer provides an interface for repeating operations // until they succeed or an exit condition is met. type Retryer interface { - // NextOr returns true if the operation should be repeated. - // Otherwise, it calls fail and returns false. - NextOr(t Failer, fail func()) bool + // Continue returns true if the operation should be repeated, otherwise it + // returns false to indicate retrying should stop. + Continue() bool } // Counter repeats an operation a given number of @@ -176,10 +191,8 @@ type Counter struct { count int } -func (r *Counter) NextOr(t Failer, fail func()) bool { - t.Helper() +func (r *Counter) Continue() bool { if r.count == r.Count { - fail() return false } if r.count > 0 { @@ -200,14 +213,12 @@ type Timer struct { stop time.Time } -func (r *Timer) NextOr(t Failer, fail func()) bool { - t.Helper() +func (r *Timer) Continue() bool { if r.stop.IsZero() { r.stop = time.Now().Add(r.Timeout) return true } if time.Now().After(r.stop) { - fail() return false } time.Sleep(r.Wait) diff --git a/sdk/testutil/retry/retry_test.go b/sdk/testutil/retry/retry_test.go index f58f8eb92f..31923a0bfb 100644 --- a/sdk/testutil/retry/retry_test.go +++ b/sdk/testutil/retry/retry_test.go @@ -1,8 +1,11 @@ package retry import ( + "fmt" "testing" "time" + + "github.com/stretchr/testify/require" ) // delta defines the time band a test run should complete in. @@ -19,19 +22,15 @@ func TestRetryer(t *testing.T) { for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - var iters, fails int - fail := func() { fails++ } + var iters int start := time.Now() - for tt.r.NextOr(t, fail) { + for tt.r.Continue() { iters++ } dur := time.Since(start) if got, want := iters, 3; got != want { t.Fatalf("got %d retries want %d", got, want) } - if got, want := fails, 1; got != want { - t.Fatalf("got %d FailNow calls want %d", got, want) - } // since the first iteration happens immediately // the retryer waits only twice for three iterations. // order of events: (true, (wait) true, (wait) true, false) @@ -41,3 +40,52 @@ func TestRetryer(t *testing.T) { }) } } + +func TestRunWith(t *testing.T) { + t.Run("calls FailNow after exceeding retries", func(t *testing.T) { + ft := &fakeT{} + iter := 0 + RunWith(&Counter{Count: 3, Wait: time.Millisecond}, ft, func(r *R) { + iter++ + r.FailNow() + }) + + require.Equal(t, 3, iter) + require.Equal(t, 1, ft.fails) + }) + + t.Run("Stop ends the retrying", func(t *testing.T) { + ft := &fakeT{} + iter := 0 + RunWith(&Counter{Count: 5, Wait: time.Millisecond}, ft, func(r *R) { + iter++ + if iter == 2 { + r.Stop(fmt.Errorf("do not proceed")) + } + r.Fatalf("not yet") + }) + + require.Equal(t, 2, iter) + require.Equal(t, 1, ft.fails) + require.Len(t, ft.out, 1) + require.Contains(t, ft.out[0], "not yet\n") + require.Contains(t, ft.out[0], "do not proceed\n") + }) +} + +type fakeT struct { + fails int + out []string +} + +func (f *fakeT) Helper() {} + +func (f *fakeT) Log(args ...interface{}) { + f.out = append(f.out, fmt.Sprint(args...)) +} + +func (f *fakeT) FailNow() { + f.fails++ +} + +var _ Failer = &fakeT{} diff --git a/website/content/api-docs/health.mdx b/website/content/api-docs/health.mdx index 4a3f1b759b..220244dc63 100644 --- a/website/content/api-docs/health.mdx +++ b/website/content/api-docs/health.mdx @@ -229,9 +229,10 @@ The table below shows this endpoint's support for - `near` `(string: "")` - Specifies a node name to sort the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. This is specified as - part of the URL as a query parameter. **Note** that `near` can not be used if - [`use_streaming_backend`](/docs/agent/options#use_streaming_backend) - is enabled, because the data is not available to sort the results. + part of the URL as a query parameter. **Note** that using `near` will ignore + [`use_streaming_backend`](/docs/agent/options#use_streaming_backend) and always + use blocking queries, because the data required to sort the results is not available + to the streaming backend. - `tag` `(string: "")` **Deprecated** - Use `filter` with the `Service.Tags` selector instead. This parameter will be removed in a future version of Consul.