Merge pull request #10112 from hashicorp/dnephin/remove-streaming-from-cache

streaming: replace agent/cache with submatview.Store
This commit is contained in:
Daniel Nephin 2021-04-28 17:31:42 -04:00 committed by hc-github-team-consul-core
parent 8cc2d3ec4f
commit 798953f57d
21 changed files with 1796 additions and 732 deletions

3
.changelog/10112.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active.
```

View File

@ -373,15 +373,21 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache,
}
cacheName := cachetype.HealthServicesName
if bd.RuntimeConfig.UseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return nil, err
}
a.rpcClientHealth = &health.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cacheName,
CacheNameIngress: cachetype.HealthServicesName,
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
}
a.serviceManager = NewServiceManager(&a)
@ -533,6 +539,8 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy)
}
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
@ -540,7 +548,6 @@ func (a *Agent) Start(ctx context.Context) error {
Logger: a.logger.Named(logging.ProxyConfig),
State: a.State,
Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
},
@ -1385,6 +1392,8 @@ func (a *Agent) ShutdownAgent() error {
a.cache.Close()
}
a.rpcClientHealth.Close()
var err error
if a.delegate != nil {
err = a.delegate.Shutdown()

View File

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt"
"github.com/hashicorp/consul/agent/cache"
@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
}
}
type fakeGRPCConnPool struct{}
func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{

View File

@ -1,66 +0,0 @@
package cachetype
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
expectedNamespace string
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
expectedNamespace: ns,
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != t.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, t.expectedNamespace)
}
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

View File

@ -1,12 +1,13 @@
package consul
import (
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)
type Deps struct {
@ -15,5 +16,9 @@ type Deps struct {
Tokens *token.Store
Router *router.Router
ConnPool *pool.ConnPool
GRPCConnPool *grpc.ClientConnPool
GRPCConnPool GRPCClientConner
}
type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
}

View File

@ -5,14 +5,13 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs"
)
type subscribeBackend struct {
srv *Server
connPool *agentgrpc.ClientConnPool
connPool GRPCClientConner
}
// TODO: refactor Resolve methods to an ACLBackend that can be used by all

View File

@ -219,13 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil
}
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"}
}
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
if err != nil {
return nil, err

View File

@ -352,10 +352,7 @@ func testManager_BasicLifecycle(
require := require.New(t)
logger := testutil.Logger(t)
state := local.NewState(local.Config{}, logger, &token.Store{})
source := &structs.QuerySource{
Node: "node1",
Datacenter: "dc1",
}
source := &structs.QuerySource{Datacenter: "dc1"}
// Stub state syncing
state.TriggerSyncChanges = func() {}

View File

@ -5,16 +5,18 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// Client provides access to service health data.
type Client struct {
NetRPC NetRPC
Cache CacheGetter
// CacheName to use for service health.
CacheName string
// CacheNameIngress is the name of the cache type to use for ingress
// service health.
CacheNameIngress string
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
}
type NetRPC interface {
@ -26,10 +28,23 @@ type CacheGetter interface {
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}
func (c *Client) ServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
}
out, md, err := c.getServiceNodes(ctx, req)
if err != nil {
return out, md, err
@ -50,18 +65,12 @@ func (c *Client) getServiceNodes(
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
}
cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
}
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if err != nil {
return out, md, err
}
@ -80,9 +89,55 @@ func (c *Client) Notify(
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
if c.useStreaming(req) {
sr := c.newServiceRequest(req)
return c.ViewStore.Notify(ctx, sr, correlationID, ch)
}
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
}
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
return serviceRequest{
ServiceSpecificRequest: req,
deps: c.MaterializerDeps,
}
}
// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}
type serviceRequest struct {
structs.ServiceSpecificRequest
deps MaterializerDeps
}
func (r serviceRequest) CacheInfo() cache.RequestInfo {
return r.ServiceSpecificRequest.CacheInfo()
}
func (r serviceRequest) Type() string {
return "service-health"
}
func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn),
Logger: r.deps.Logger,
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
}

View File

@ -0,0 +1,235 @@
package health
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
expected func(t *testing.T, c *Client)
}
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
}
_, _, err := c.ServiceNodes(context.Background(), tc.req)
require.NoError(t, err)
tc.expected(t, c)
}
var testCases = []testCase{
{
name: "rpc by default",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
},
expected: useRPC,
},
{
name: "use streaming instead of cache",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
},
expected: useStreaming,
},
{
name: "use streaming for MinQueryIndex",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
},
expected: useStreaming,
},
{
name: "use cache for ingress request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
Ingress: true,
},
expected: useCache,
},
{
name: "use cache for near request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
Source: structs.QuerySource{Node: "node1"},
},
expected: useCache,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func useRPC(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls)
}
func useStreaming(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 1)
}
func useCache(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"cache-no-streaming"}, cache.calls)
}
type fakeCache struct {
calls []string
}
func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) {
f.calls = append(f.calls, t)
result := &structs.IndexedCheckServiceNodes{}
return result, cache.ResultMeta{}, nil
}
func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error {
f.calls = append(f.calls, t)
return nil
}
type fakeNetRPC struct {
calls []string
}
func (f *fakeNetRPC) RPC(method string, _ interface{}, _ interface{}) error {
f.calls = append(f.calls, method)
return nil
}
type fakeViewStore struct {
calls []submatview.Request
}
func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) {
f.calls = append(f.calls, req)
return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil
}
func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error {
f.calls = append(f.calls, req)
return nil
}
func TestClient_Notify_BackendRouting(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
expected func(t *testing.T, c *Client)
}
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
}
err := c.Notify(context.Background(), tc.req, "cid", nil)
require.NoError(t, err)
tc.expected(t, c)
}
var testCases = []testCase{
{
name: "streaming by default",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
},
expected: useStreaming,
},
{
name: "use cache for ingress request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
Ingress: true,
},
expected: useCache,
},
{
name: "use cache for near request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
Source: structs.QuerySource{Node: "node1"},
},
expected: useCache,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -0,0 +1,69 @@
package health
import (
"context"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// streamClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type streamClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
subFn func(*pbsubscribe.SubscribeRequest) error
events chan eventOrErr
ctx context.Context
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func newStreamClient(sub func(req *pbsubscribe.SubscribeRequest) error) *streamClient {
if sub == nil {
sub = func(*pbsubscribe.SubscribeRequest) error {
return nil
}
}
return &streamClient{
events: make(chan eventOrErr, 32),
subFn: sub,
}
}
func (t *streamClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if err := t.subFn(req); err != nil {
return nil, err
}
t.ctx = ctx
return t, nil
}
func (t *streamClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *streamClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *streamClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

View File

@ -1,74 +1,27 @@
package cachetype
package health
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"time"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
const (
// Recommended name for registration.
StreamingHealthServicesName = "streaming-health-services"
)
// StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct {
RegisterOptionsBlockingRefresh
deps MaterializerDeps
}
// RegisterOptions returns options with a much shorter LastGetTTL than the default.
// Unlike other cache-types, StreamingHealthServices runs a materialized view in
// the background which will receive streamed events from a server. If the cache
// is not being used, that stream uses memory on the server and network transfer
// between the client and the server.
// The materialize view and the stream are stopped when the cache entry expires,
// so using a shorter TTL ensures the cache entry expires sooner.
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 20 * time.Minute
return opts
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
return &StreamingHealthServices{deps: deps}
}
type MaterializerDeps struct {
Client submatview.StreamClient
Conn *grpc.ClientConn
Logger hclog.Logger
}
// Fetch service health from the materialized view. If no materialized view
// exists, create one and start it running in a goroutine. The goroutine will
// exit when the cache entry storing the result is expired, the cache will call
// Close on the result.State.
//
// Fetch implements part of the cache.Type interface, and assumes that the
// caller ensures that only a single call to Fetch is running at any time.
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
}
srvReq := req.(*structs.ServiceSpecificRequest)
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest {
return func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: srvReq.ServiceName,
@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
}
return req
}
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil {
return cache.FetchResult{}, err
}
ctx, cancel := context.WithCancel(context.TODO())
go materializer.Run(ctx)
state := &streamingHealthState{
materializer: materializer,
done: ctx.Done(),
cancel: cancel,
}
return state.Fetch(opts)
}
func newMaterializer(
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) {
view, err := newHealthView(req)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: deps.Client,
Logger: deps.Logger,
Waiter: &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
},
Request: newRequestFn,
}), nil
}
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
// add itself to the FetchResult.State.
type streamingHealthState struct {
materializer *submatview.Materializer
done <-chan struct{}
cancel func()
}
func (s *streamingHealthState) Close() error {
s.cancel()
return nil
}
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
result, err := s.materializer.Fetch(s.done, opts)
result.State = s
return result, err
}
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(req)
if err != nil {
return nil, err
@ -197,7 +90,7 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator
typ := reflect.TypeOf(structs.CheckServiceNode{})
@ -274,7 +167,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
}
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) {
func (s *healthView) Result(index uint64) interface{} {
result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
@ -286,7 +179,7 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
}
sortCheckServiceNodes(&result)
return &result, nil
return &result
}
func (s *healthView) Reset() {

View File

@ -1,238 +1,29 @@
package cachetype
package health
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
namespace := pbcommon.DefaultEnterpriseMeta.Namespace
client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(1))
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
runStep(t, "empty snapshot returned", func(t *testing.T) {
// Fetch should return an empty
// result of the right type with a non-zero index, and in the background begin
// streaming updates.
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
client.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
start = time.Now()
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
// We don't require instances to be returned in same order so we use
// elementsMatch which is recursive.
requireResultsSame(t,
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
result.Value.(*structs.IndexedCheckServiceNodes),
)
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web"))
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
// the same order of results (which vary due to map usage internally).
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
require.Equal(t, want.Index, got.Index)
svcIDs := func(csns structs.CheckServiceNodes) []string {
res := make([]string, 0, len(csns))
for _, csn := range csns {
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
}
return res
}
gotIDs := svcIDs(got.Nodes)
wantIDs := svcIDs(want.Nodes)
require.ElementsMatch(t, wantIDs, gotIDs)
}
// getNamespace returns a namespace if namespace support exists, otherwise
// returns the empty string. It allows the same tests to work in both oss and ent
// without duplicating the tests.
func getNamespace(ns string) string {
meta := structs.NewEnterpriseMeta(ns)
return meta.NamespaceOrEmpty()
}
func TestOrderingConsistentWithMemDb(t *testing.T) {
func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
index := uint64(42)
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
newID, err := uuid.GenerateUUID()
@ -270,29 +61,205 @@ func TestOrderingConsistentWithMemDb(t *testing.T) {
two := buildTestNode("node1", "testService:2")
three := buildTestNode("node2", "testService")
result := structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
three, two, zero, one,
},
QueryMeta: structs.QueryMeta{
Index: index,
},
Nodes: structs.CheckServiceNodes{three, two, zero, one},
QueryMeta: structs.QueryMeta{Index: index},
}
sortCheckServiceNodes(&result)
expected := structs.CheckServiceNodes{zero, one, two, three}
require.Equal(t, expected, result.Nodes)
}
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
streamClient := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
streamClient.QueueEvents(newEndOfSnapshotEvent(1))
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: streamClient,
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
runStep(t, "empty snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
})
var lastResultValue structs.CheckServiceNodes
runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
}()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 1,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
streamClient.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
"result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
"result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
start = time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 2,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web"))
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
namespace := getNamespace("ns2")
client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
client := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances on different nodes
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
@ -304,37 +271,28 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(5, 3),
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
// Result will be sorted alphabetically the same way as memdb
return nodes
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: client,
}
runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
expected := newExpectedNodes("node1", "node2", "node3")
expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index
opts.LastResult = &result
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "blocks until deregistration", func(t *testing.T) {
@ -348,8 +306,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
@ -358,10 +316,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
"Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value))
expected := newExpectedNodes("node2", "node3")
expected.Index = 20
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index
opts.LastResult = &result
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "server reload is respected", func(t *testing.T) {
@ -379,18 +338,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
// Make another blocking query with THE SAME index. It should immediately
// return the new snapshot.
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
expected := newExpectedNodes("node3", "node4", "node5")
expected.Index = 50
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index
opts.LastResult = &result
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
@ -404,26 +364,54 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
newEndOfSnapshotEvent(50))
start := time.Now()
opts.MinIndex = 49
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
req.QueryOptions.MinQueryIndex = 49
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
expected := newExpectedNodes("node3", "node4", "node5")
expected.Index = 50
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
})
}
func TestStreamingHealthServices_EventBatches(t *testing.T) {
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{}
for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node},
})
}
return result
}
// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
// by Node name.
var cmpCheckServiceNodeNames = cmp.Options{
cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
return x.Node.Node == y.Node.Node
}),
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
namespace := getNamespace("ns3")
client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
client := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
@ -434,36 +422,28 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
batchEv,
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: client,
}
runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
expected := newExpectedNodes("node1", "node2", "node3")
expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "batched updates work too", func(t *testing.T) {
@ -476,92 +456,220 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
newEventServiceHealthRegister(20, 4, "web"),
)
client.QueueEvents(batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
gatherNodes(result.Value))
expected := newExpectedNodes("node2", "node3", "node4")
expected.Index = 20
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index
opts.LastResult = &result
req.QueryOptions.MinQueryIndex = result.Index
})
}
func TestStreamingHealthServices_Filtering(t *testing.T) {
func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
namespace := getNamespace("ns3")
client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
streamClient := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
MaxQueryTime: time.Second,
},
},
},
streamClient: streamClient,
}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"),
newEventServiceHealthRegister(5, 2, "web"),
newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents(
streamClient.QueueEvents(
batchEv,
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
runStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
expected := newExpectedNodes("node2")
expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index
opts.LastResult = &result
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
// Simulate multiple registrations happening in one Txn (all have same index)
batchEv := newEventBatchWithEvents(
// Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"),
// Register another
newEventServiceHealthRegister(20, 4, "web"),
)
client.QueueEvents(batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
streamClient.QueueEvents(batchEv)
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
expected := newExpectedNodes("node2")
expected.Index = 20
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
})
}
// serviceRequestStub overrides NewMaterializer so that test can use a fake
// StreamClient.
type serviceRequestStub struct {
serviceRequest
streamClient submatview.StreamClient
}
func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: r.streamClient,
Logger: hclog.New(nil),
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
// getNamespace returns a namespace if namespace support exists, otherwise
// returns the empty string. It allows the same tests to work in both oss and ent
// without duplicating the tests.
func getNamespace(ns string) string {
meta := structs.NewEnterpriseMeta(ns)
return meta.NamespaceOrEmpty()
}
func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
return func(request *pbsubscribe.SubscribeRequest) error {
if request.Namespace != ns {
return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace)
}
return nil
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
@ -578,7 +686,7 @@ func TestNewFilterEvaluator(t *testing.T) {
}
fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req)
e, err := newFilterEvaluator(tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)

View File

@ -8,31 +8,27 @@ import (
"sync"
"time"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
grpcresolver "google.golang.org/grpc/resolver"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
)
@ -46,6 +42,7 @@ type BaseDeps struct {
MetricsHandler MetricsHandler
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
ViewStore *submatview.Store
}
// MetricsHandler provides an http.Handler for displaying metrics.
@ -69,7 +66,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
if err != nil {
return d, err
}
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger))
grpcLogInitOnce.Do(func() {
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger))
})
for _, w := range result.Warnings {
d.Logger.Warn(w)
@ -100,6 +100,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
cfg.Cache.Logger = d.Logger.Named("cache")
// cache-types are not registered yet, but they won't be used until the components are started.
d.Cache = cache.New(cfg.Cache)
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
builder := resolver.NewServerResolverBuilder(resolver.Config{})
@ -122,32 +123,12 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
return d, err
}
if err := registerCacheTypes(d); err != nil {
return d, err
}
return d, nil
}
// registerCacheTypes on bd.Cache.
//
// Note: most cache types are still registered in Agent.registerCache. This
// function is for registering newer cache-types which no longer have a dependency
// on Agent.
func registerCacheTypes(bd BaseDeps) error {
if bd.RuntimeConfig.UseStreamingBackend {
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return err
}
matDeps := cachetype.MaterializerDeps{
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
Logger: bd.Logger,
}
bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps))
}
return nil
}
// grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and
// causes data races when it is re-initialized.
var grpcLogInitOnce sync.Once
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
var rpcSrcAddr *net.TCPAddr

View File

@ -10,7 +10,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
@ -32,7 +31,7 @@ type View interface {
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) (interface{}, error)
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
@ -80,6 +79,18 @@ func NewMaterializer(deps Deps) *Materializer {
retryWaiter: deps.Waiter,
updateCh: make(chan struct{}),
}
if v.retryWaiter == nil {
v.retryWaiter = &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}
return v
}
@ -204,81 +215,61 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{})
}
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
// call. Cache types that use streaming should just be able to proxy to this
// once they have a subscription object and return it's results directly.
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
type Result struct {
Index uint64
Value interface{}
}
// Get current view Result and index
// getFromView blocks until the index of the View is greater than opts.MinIndex,
//or the context is cancelled.
func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index)
result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh
m.lock.Unlock()
if err != nil {
return result, err
}
result.Index = index
result.Value = val
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the
// request min index might be different and from initial user request.
if index > 0 && index > opts.MinIndex {
// the update chan.
if result.Index > 0 && result.Index > minIndex {
return result, nil
}
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = m.view.Result(m.index)
}
m.lock.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
return result, fetchErr
}
if err != nil {
switch {
case m.err != nil:
err := m.err
m.lock.Unlock()
return result, err
}
// Sanity check the update is actually later than the one the user
// requested.
if result.Index <= opts.MinIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
updateCh = m.updateCh
m.lock.Unlock()
continue
}
// Return the updated result
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil
case <-timeoutCh:
// Just return whatever we got originally, might still be empty
return result, nil
case <-done:
return result, context.Canceled
case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err()
}
}
}

240
agent/submatview/store.go Normal file
View File

@ -0,0 +1,240 @@
package submatview
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
)
// Store of Materializers. Store implements an interface similar to
// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests
// as long as the requests are identical.
// Store is used in place of agent/cache.Cache because with the streaming
// backend there is no longer any need to run a background goroutine to refresh
// stored values.
type Store struct {
logger hclog.Logger
lock sync.RWMutex
byKey map[string]entry
// expiryHeap tracks entries with 0 remaining requests. Entries are ordered
// by most recent expiry first.
expiryHeap *ttlcache.ExpiryHeap
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated. It is a field on the struct
// so that it can be patched in tests without need a lock.
idleTTL time.Duration
}
type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
}
// NewStore creates and returns a Store that is ready for use. The caller must
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
func NewStore(logger hclog.Logger) *Store {
return &Store{
logger: logger,
byKey: make(map[string]entry),
expiryHeap: ttlcache.NewExpiryHeap(),
idleTTL: 20 * time.Minute,
}
}
// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
for {
s.lock.RLock()
timer := s.expiryHeap.Next()
s.lock.RUnlock()
select {
case <-ctx.Done():
timer.Stop()
return
// the first item in the heap has changed, restart the timer with the
// new TTL.
case <-s.expiryHeap.NotifyCh:
timer.Stop()
continue
// the TTL for the first item has been reached, attempt an expiration.
case <-timer.Wait():
s.lock.Lock()
he := timer.Entry
s.expiryHeap.Remove(he.Index())
e := s.byKey[he.Key()]
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
e.stop()
delete(s.byKey, he.Key())
}
s.lock.Unlock()
}
}
}
// Request is used to request data from the Store.
// Note that cache.Request is required, but some of the fields cache.RequestInfo
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
cache.Request
// NewMaterializer will be called if there is no active materializer to fulfil
// the request. It should return a Materializer appropriate for streaming
// data to fulfil this request.
NewMaterializer() (*Materializer, error)
// Type should return a string which uniquely identifies this type of request.
// The returned value is used as the prefix of the key used to index
// entries in the Store.
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return Result{}, err
}
defer s.releaseEntry(key)
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
defer cancel()
result, err := materializer.getFromView(ctx, info.MinIndex)
// context.DeadlineExceeded is translated to nil to match the behaviour of
// agent/cache.Cache.Get.
if err == nil || errors.Is(err, context.DeadlineExceeded) {
return result, nil
}
return result, err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
ctx context.Context,
req Request,
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return err
}
go func() {
defer s.releaseEntry(key)
index := info.MinIndex
for {
result, err := materializer.getFromView(ctx, index)
switch {
case ctx.Err() != nil:
return
case err != nil:
s.logger.Warn("handling error in Store.Notify",
"error", err,
"request-type", req.Type(),
"index", index)
continue
}
index = result.Index
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
}
select {
case updateCh <- u:
case <-ctx.Done():
return
}
}
}()
return nil
}
// readEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) readEntry(req Request) (string, *Materializer, error) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock()
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
e.requests++
s.byKey[key] = e
return key, e.materializer, nil
}
mat, err := req.NewMaterializer()
if err != nil {
return "", nil, err
}
ctx, cancel := context.WithCancel(context.Background())
go mat.Run(ctx)
e = entry{
materializer: mat,
stop: cancel,
requests: 1,
}
s.byKey[key] = e
return key, e.materializer, nil
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}

View File

@ -0,0 +1,458 @@
package submatview
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestStore_Get(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(
newEndOfSnapshotEvent(2),
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1"))
runStep(t, "from empty store, starts materializer", func(t *testing.T) {
var result Result
retry.Run(t, func(r *retry.R) {
var err error
result, err = store.Get(ctx, req)
require.NoError(r, err)
require.Equal(r, uint64(22), result.Index)
})
r, ok := result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
runStep(t, "with an index that already exists in the view", func(t *testing.T) {
req.index = 21
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(22), result.Index)
r, ok := result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
chResult := make(chan resultOrError, 1)
req.index = 40
go func() {
result, err := store.Get(ctx, req)
chResult <- resultOrError{Result: result, Err: err}
}()
runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) {
select {
case <-chResult:
t.Fatalf("expected Get to block")
case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
})
runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1"))
select {
case <-chResult:
t.Fatalf("expected Get to block")
case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
})
runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1"))
var getResult resultOrError
select {
case getResult = <-chResult:
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected Get to unblock when new events are received")
}
require.NoError(t, getResult.Err)
require.Equal(t, uint64(41), getResult.Result.Index)
r, ok := getResult.Result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(41), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
}
type resultOrError struct {
Result Result
Err error
}
type fakeRequest struct {
index uint64
key string
client *TestStreamingClient
}
func (r *fakeRequest) CacheInfo() cache.RequestInfo {
key := r.key
if key == "" {
key = "key"
}
return cache.RequestInfo{
Key: key,
Token: "abcd",
Datacenter: "dc1",
Timeout: 4 * time.Second,
MinIndex: r.index,
}
}
func (r *fakeRequest) NewMaterializer() (*Materializer, error) {
return NewMaterializer(Deps{
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
Client: r.client,
Logger: hclog.New(nil),
Request: func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "key",
Token: "abcd",
Datacenter: "dc1",
Index: index,
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}
return req
},
}), nil
}
func (r *fakeRequest) Type() string {
return fmt.Sprintf("%T", r)
}
type fakeView struct {
srvs map[string]*pbservice.CheckServiceNode
}
func (f *fakeView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
id := serviceHealth.CheckServiceNode.UniqueID()
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
f.srvs[id] = serviceHealth.CheckServiceNode
case pbsubscribe.CatalogOp_Deregister:
delete(f.srvs, id)
}
}
return nil
}
func (f *fakeView) Result(index uint64) interface{} {
srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs))
for _, srv := range f.srvs {
srvs = append(srvs, srv)
}
return fakeResult{srvs: srvs, index: index}
}
type fakeResult struct {
srvs []*pbservice.CheckServiceNode
index uint64
}
func (f *fakeView) Reset() {
f.srvs = make(map[string]*pbservice.CheckServiceNode)
}
func TestStore_Notify(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(
newEndOfSnapshotEvent(2),
newEventServiceHealthRegister(22, 2, "srv1"))
cID := "correlate"
ch := make(chan cache.UpdateEvent)
err := store.Notify(ctx, req, cID, ch)
require.NoError(t, err)
runStep(t, "from empty store, starts materializer", func(t *testing.T) {
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
require.Equal(t, 1, e.requests)
})
runStep(t, "updates are received", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
select {
case update := <-ch:
require.NoError(r, update.Err)
require.Equal(r, cID, update.CorrelationID)
require.Equal(r, uint64(22), update.Meta.Index)
require.Equal(r, uint64(22), update.Result.(fakeResult).index)
case <-time.After(100 * time.Millisecond):
r.Stop(fmt.Errorf("expected Get to unblock when new events are received"))
}
})
req.client.QueueEvents(newEventServiceHealthRegister(24, 2, "srv1"))
select {
case update := <-ch:
require.NoError(t, update.Err)
require.Equal(t, cID, update.CorrelationID)
require.Equal(t, uint64(24), update.Meta.Index)
require.Equal(t, uint64(24), update.Result.(fakeResult).index)
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected Get to unblock when new events are received")
}
})
runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) {
cancel()
retry.Run(t, func(r *retry.R) {
store.lock.Lock()
defer store.lock.Unlock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(r, 0, e.expiry.Index())
require.Equal(r, 0, e.requests)
require.Equal(r, store.expiryHeap.Next().Entry, e.expiry)
})
})
}
func TestStore_Notify_ManyRequests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(newEndOfSnapshotEvent(2))
cID := "correlate"
ch1 := make(chan cache.UpdateEvent)
ch2 := make(chan cache.UpdateEvent)
require.NoError(t, store.Notify(ctx, req, cID, ch1))
assertRequestCount(t, store, req, 1)
require.NoError(t, store.Notify(ctx, req, cID, ch2))
assertRequestCount(t, store, req, 2)
req.index = 15
go func() {
_, _ = store.Get(ctx, req)
}()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 3)
})
go func() {
_, _ = store.Get(ctx, req)
}()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 4)
})
var req2 *fakeRequest
runStep(t, "Get and Notify with a different key", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req2 = &fakeRequest{client: req.client, key: "key2", index: 22}
require.NoError(t, store.Notify(ctx, req2, cID, ch1))
go func() {
_, _ = store.Get(ctx, req2)
}()
// the original entry should still be at count 4
assertRequestCount(t, store, req, 4)
// the new entry should be at count 2
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req2, 2)
})
})
runStep(t, "end all the requests", func(t *testing.T) {
req.client.QueueEvents(
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(12, 2, "srv1"),
newEventServiceHealthRegister(13, 1, "srv2"),
newEventServiceHealthRegister(16, 3, "srv2"))
// The two Get requests should exit now that the index has been updated
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 2)
})
// Cancel the context so all requests terminate
cancel()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 0)
})
})
runStep(t, "the expiry heap should contain two entries", func(t *testing.T) {
store.lock.Lock()
defer store.lock.Unlock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
e2 := store.byKey[makeEntryKey(req2.Type(), req2.CacheInfo())]
require.Equal(t, 0, e2.expiry.Index())
require.Equal(t, 1, e.expiry.Index())
require.Equal(t, store.expiryHeap.Next().Entry, e2.expiry)
})
}
type testingT interface {
Helper()
Fatalf(string, ...interface{})
}
func assertRequestCount(t testingT, s *Store, req Request, expected int) {
t.Helper()
key := makeEntryKey(req.Type(), req.CacheInfo())
s.lock.Lock()
defer s.lock.Unlock()
actual := s.byKey[key].requests
if actual != expected {
t.Fatalf("expected request count to be %d, got %d", expected, actual)
}
}
func TestStore_Run_ExpiresEntries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
ttl := 10 * time.Millisecond
store.idleTTL = ttl
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(newEndOfSnapshotEvent(2))
cID := "correlate"
ch1 := make(chan cache.UpdateEvent)
reqCtx, reqCancel := context.WithCancel(context.Background())
defer reqCancel()
require.NoError(t, store.Notify(reqCtx, req, cID, ch1))
assertRequestCount(t, store, req, 1)
// Get a copy of the entry so that we can check it was expired later
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
reqCancel()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 0)
})
// wait for the entry to expire, with lots of buffer
time.Sleep(3 * ttl)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 0)
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -1,7 +1,11 @@
package cachetype
package submatview
import (
"context"
"fmt"
"sync"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
@ -9,6 +13,84 @@ import (
"github.com/hashicorp/consul/types"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
expectedNamespace string
subClients []*subscribeClient
lock sync.RWMutex
events []eventOrErr
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{expectedNamespace: ns}
}
func (s *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != s.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, s.expectedNamespace)
}
c := &subscribeClient{
events: make(chan eventOrErr, 32),
ctx: ctx,
}
s.lock.Lock()
s.subClients = append(s.subClients, c)
for _, event := range s.events {
c.events <- event
}
s.lock.Unlock()
return c, nil
}
type subscribeClient struct {
grpc.ClientStream
events chan eventOrErr
ctx context.Context
}
func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
s.lock.Lock()
for _, e := range events {
s.events = append(s.events, eventOrErr{Event: e})
for _, c := range s.subClients {
c.events <- eventOrErr{Event: e}
}
}
s.lock.Unlock()
}
func (s *TestStreamingClient) QueueErr(err error) {
s.lock.Lock()
s.events = append(s.events, eventOrErr{Err: err})
for _, c := range s.subClients {
c.events <- eventOrErr{Err: err}
}
s.lock.Unlock()
}
func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-c.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
@ -22,13 +104,6 @@ func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
}
}
// newEventServiceHealthRegister returns a realistically populated service
// health registration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
@ -54,61 +129,17 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Checks: []*pbservice.HealthCheck{
{
Node: node,
CheckID: "serf-health",
Name: "serf-health",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
{
Node: node,
CheckID: types.CheckID("service:" + svc),
Name: "service:" + svc,
ServiceID: svc,
ServiceName: svc,
Type: "ttl",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
},
}
}
// TestEventServiceHealthDeregister returns a realistically populated service
// health deregistration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
@ -129,12 +160,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we

View File

@ -34,6 +34,7 @@ type Failer interface {
// R provides context for the retryer.
type R struct {
fail bool
done bool
output []string
}
@ -77,6 +78,12 @@ func (r *R) log(s string) {
r.output = append(r.output, decorate(s))
}
// Stop retrying, and fail the test with the specified error.
func (r *R) Stop(err error) {
r.log(err.Error())
r.done = true
}
func decorate(s string) string {
_, file, line, ok := runtime.Caller(3)
if ok {
@ -120,6 +127,7 @@ func dedup(a []string) string {
func run(r Retryer, t Failer, f func(r *R)) {
t.Helper()
rr := &R{}
fail := func() {
t.Helper()
out := dedup(rr.output)
@ -128,7 +136,8 @@ func run(r Retryer, t Failer, f func(r *R)) {
}
t.FailNow()
}
for r.NextOr(t, fail) {
for r.Continue() {
func() {
defer func() {
if p := recover(); p != nil && p != runFailed {
@ -137,11 +146,17 @@ func run(r Retryer, t Failer, f func(r *R)) {
}()
f(rr)
}()
if !rr.fail {
switch {
case rr.done:
fail()
return
case !rr.fail:
return
}
rr.fail = false
}
fail()
}
// DefaultFailer provides default retry.Run() behavior for unit tests.
@ -162,9 +177,9 @@ func ThreeTimes() *Counter {
// Retryer provides an interface for repeating operations
// until they succeed or an exit condition is met.
type Retryer interface {
// NextOr returns true if the operation should be repeated.
// Otherwise, it calls fail and returns false.
NextOr(t Failer, fail func()) bool
// Continue returns true if the operation should be repeated, otherwise it
// returns false to indicate retrying should stop.
Continue() bool
}
// Counter repeats an operation a given number of
@ -176,10 +191,8 @@ type Counter struct {
count int
}
func (r *Counter) NextOr(t Failer, fail func()) bool {
t.Helper()
func (r *Counter) Continue() bool {
if r.count == r.Count {
fail()
return false
}
if r.count > 0 {
@ -200,14 +213,12 @@ type Timer struct {
stop time.Time
}
func (r *Timer) NextOr(t Failer, fail func()) bool {
t.Helper()
func (r *Timer) Continue() bool {
if r.stop.IsZero() {
r.stop = time.Now().Add(r.Timeout)
return true
}
if time.Now().After(r.stop) {
fail()
return false
}
time.Sleep(r.Wait)

View File

@ -1,8 +1,11 @@
package retry
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// delta defines the time band a test run should complete in.
@ -19,19 +22,15 @@ func TestRetryer(t *testing.T) {
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
var iters, fails int
fail := func() { fails++ }
var iters int
start := time.Now()
for tt.r.NextOr(t, fail) {
for tt.r.Continue() {
iters++
}
dur := time.Since(start)
if got, want := iters, 3; got != want {
t.Fatalf("got %d retries want %d", got, want)
}
if got, want := fails, 1; got != want {
t.Fatalf("got %d FailNow calls want %d", got, want)
}
// since the first iteration happens immediately
// the retryer waits only twice for three iterations.
// order of events: (true, (wait) true, (wait) true, false)
@ -41,3 +40,52 @@ func TestRetryer(t *testing.T) {
})
}
}
func TestRunWith(t *testing.T) {
t.Run("calls FailNow after exceeding retries", func(t *testing.T) {
ft := &fakeT{}
iter := 0
RunWith(&Counter{Count: 3, Wait: time.Millisecond}, ft, func(r *R) {
iter++
r.FailNow()
})
require.Equal(t, 3, iter)
require.Equal(t, 1, ft.fails)
})
t.Run("Stop ends the retrying", func(t *testing.T) {
ft := &fakeT{}
iter := 0
RunWith(&Counter{Count: 5, Wait: time.Millisecond}, ft, func(r *R) {
iter++
if iter == 2 {
r.Stop(fmt.Errorf("do not proceed"))
}
r.Fatalf("not yet")
})
require.Equal(t, 2, iter)
require.Equal(t, 1, ft.fails)
require.Len(t, ft.out, 1)
require.Contains(t, ft.out[0], "not yet\n")
require.Contains(t, ft.out[0], "do not proceed\n")
})
}
type fakeT struct {
fails int
out []string
}
func (f *fakeT) Helper() {}
func (f *fakeT) Log(args ...interface{}) {
f.out = append(f.out, fmt.Sprint(args...))
}
func (f *fakeT) FailNow() {
f.fails++
}
var _ Failer = &fakeT{}

View File

@ -229,9 +229,10 @@ The table below shows this endpoint's support for
- `near` `(string: "")` - Specifies a node name to sort the node list in
ascending order based on the estimated round trip time from that node. Passing
`?near=_agent` will use the agent's node for the sort. This is specified as
part of the URL as a query parameter. **Note** that `near` can not be used if
[`use_streaming_backend`](/docs/agent/options#use_streaming_backend)
is enabled, because the data is not available to sort the results.
part of the URL as a query parameter. **Note** that using `near` will ignore
[`use_streaming_backend`](/docs/agent/options#use_streaming_backend) and always
use blocking queries, because the data required to sort the results is not available
to the streaming backend.
- `tag` `(string: "")` **Deprecated** - Use `filter` with the `Service.Tags` selector instead.
This parameter will be removed in a future version of Consul.