rpcclient/health: integrate submatview.Store into rpcclient/health

This commit is contained in:
Daniel Nephin 2021-02-25 16:22:30 -05:00
parent 6f29fa0de8
commit 55a677b7d1
4 changed files with 71 additions and 147 deletions

View File

@ -5,11 +5,14 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
type Client struct {
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
// CacheName to use for service health.
CacheName string
// CacheNameIngress is the name of the cache type to use for ingress
@ -26,6 +29,11 @@ type CacheGetter interface {
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}
func (c *Client) ServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
@ -56,6 +64,20 @@ func (c *Client) getServiceNodes(
return out, cache.ResultMeta{}, err
}
if req.Source.Node == "" {
sr, err := newServiceRequest(req, c.MaterializerDeps)
if err != nil {
return out, cache.ResultMeta{}, err
}
result, err := c.ViewStore.Get(ctx, sr)
if err != nil {
return out, cache.ResultMeta{}, err
}
// TODO: can we store non-pointer
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
}
cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
@ -86,3 +108,38 @@ func (c *Client) Notify(
}
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
}
func newServiceRequest(req structs.ServiceSpecificRequest, deps MaterializerDeps) (serviceRequest, error) {
view, err := newHealthView(req)
if err != nil {
return serviceRequest{}, err
}
return serviceRequest{
ServiceSpecificRequest: req,
view: view,
deps: deps,
}, nil
}
type serviceRequest struct {
structs.ServiceSpecificRequest
view *healthView
deps MaterializerDeps
}
func (r serviceRequest) CacheInfo() cache.RequestInfo {
return r.ServiceSpecificRequest.CacheInfo()
}
func (r serviceRequest) Type() string {
return "service-health"
}
func (r serviceRequest) NewMaterializer() *submatview.Materializer {
return submatview.NewMaterializer(submatview.Deps{
View: r.view,
Client: r.deps.Client,
Logger: r.deps.Logger,
Request: newMaterializerRequest(r.ServiceSpecificRequest),
})
}

View File

@ -1,74 +1,27 @@
package cachetype
package health
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"time"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
const (
// Recommended name for registration.
StreamingHealthServicesName = "streaming-health-services"
)
// StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct {
RegisterOptionsBlockingRefresh
deps MaterializerDeps
}
// RegisterOptions returns options with a much shorter LastGetTTL than the default.
// Unlike other cache-types, StreamingHealthServices runs a materialized view in
// the background which will receive streamed events from a server. If the cache
// is not being used, that stream uses memory on the server and network transfer
// between the client and the server.
// The materialize view and the stream are stopped when the cache entry expires,
// so using a shorter TTL ensures the cache entry expires sooner.
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 20 * time.Minute
return opts
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
return &StreamingHealthServices{deps: deps}
}
type MaterializerDeps struct {
Client submatview.StreamClient
Logger hclog.Logger
}
// Fetch service health from the materialized view. If no materialized view
// exists, create one and start it running in a goroutine. The goroutine will
// exit when the cache entry storing the result is expired, the cache will call
// Close on the result.State.
//
// Fetch implements part of the cache.Type interface, and assumes that the
// caller ensures that only a single call to Fetch is running at any time.
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
}
srvReq := req.(*structs.ServiceSpecificRequest)
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest {
return func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: srvReq.ServiceName,
@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
}
return req
}
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil {
return cache.FetchResult{}, err
}
ctx, cancel := context.WithCancel(context.TODO())
go materializer.Run(ctx)
state := &streamingHealthState{
materializer: materializer,
done: ctx.Done(),
cancel: cancel,
}
return state.Fetch(opts)
}
func newMaterializer(
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) {
view, err := newHealthView(req)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: deps.Client,
Logger: deps.Logger,
Waiter: &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
},
Request: newRequestFn,
}), nil
}
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
// add itself to the FetchResult.State.
type streamingHealthState struct {
materializer *submatview.Materializer
done <-chan struct{}
cancel func()
}
func (s *streamingHealthState) Close() error {
s.cancel()
return nil
}
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
result, err := s.materializer.getFromView(s.done, opts)
result.State = s
return result, err
}
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(req)
if err != nil {
return nil, err
@ -197,7 +90,7 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator
typ := reflect.TypeOf(structs.CheckServiceNode{})

View File

@ -589,7 +589,7 @@ func TestNewFilterEvaluator(t *testing.T) {
}
fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req)
e, err := newFilterEvaluator(tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)

View File

@ -8,31 +8,27 @@ import (
"sync"
"time"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
grpcresolver "google.golang.org/grpc/resolver"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
)
@ -46,6 +42,7 @@ type BaseDeps struct {
MetricsHandler MetricsHandler
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
ViewStore *submatview.Store
}
// MetricsHandler provides an http.Handler for displaying metrics.
@ -100,6 +97,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
cfg.Cache.Logger = d.Logger.Named("cache")
// cache-types are not registered yet, but they won't be used until the components are started.
d.Cache = cache.New(cfg.Cache)
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
builder := resolver.NewServerResolverBuilder(resolver.Config{})
@ -122,33 +120,9 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
return d, err
}
if err := registerCacheTypes(d); err != nil {
return d, err
}
return d, nil
}
// registerCacheTypes on bd.Cache.
//
// Note: most cache types are still registered in Agent.registerCache. This
// function is for registering newer cache-types which no longer have a dependency
// on Agent.
func registerCacheTypes(bd BaseDeps) error {
if bd.RuntimeConfig.UseStreamingBackend {
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return err
}
matDeps := cachetype.MaterializerDeps{
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
Logger: bd.Logger,
}
bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps))
}
return nil
}
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
var rpcSrcAddr *net.TCPAddr
if !ipaddr.IsAny(config.RPCBindAddr) {