diff --git a/agent/agent.go b/agent/agent.go index 7aa42e3d94..5aa591f14b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4220,7 +4220,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { Datacenters: proxycfgglue.CacheDatacenters(a.cache), FederationStateListMeshGateways: proxycfgglue.CacheFederationStateListMeshGateways(a.cache), GatewayServices: proxycfgglue.CacheGatewayServices(a.cache), - Health: proxycfgglue.Health(a.rpcClientHealth), + Health: proxycfgglue.ClientHealth(a.rpcClientHealth), HTTPChecks: proxycfgglue.CacheHTTPChecks(a.cache), Intentions: proxycfgglue.CacheIntentions(a.cache), IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache), @@ -4247,6 +4247,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps) sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps) sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache)) + sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache)) diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 739cc7f649..e7924010af 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/watch" "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -139,25 +138,6 @@ func (c *cacheProxyDataSource[ReqType]) Notify( return c.c.NotifyCallback(ctx, c.t, req, correlationID, dispatchCacheUpdate(ch)) } -// Health wraps health.Client so that the proxycfg package doesn't need to -// reference cache.UpdateEvent directly. -func Health(client *health.Client) proxycfg.Health { - return &healthWrapper{client} -} - -type healthWrapper struct { - client *health.Client -} - -func (h *healthWrapper) Notify( - ctx context.Context, - req *structs.ServiceSpecificRequest, - correlationID string, - ch chan<- proxycfg.UpdateEvent, -) error { - return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ch)) -} - func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback { return func(ctx context.Context, e cache.UpdateEvent) { u := proxycfg.UpdateEvent{ diff --git a/agent/proxycfg-glue/health.go b/agent/proxycfg-glue/health.go new file mode 100644 index 0000000000..331c8012b1 --- /dev/null +++ b/agent/proxycfg-glue/health.go @@ -0,0 +1,82 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/rpcclient/health" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" +) + +// ClientHealth satisfies the proxycfg.Health interface by sourcing data from +// the given health.Client. +func ClientHealth(client *health.Client) proxycfg.Health { + return &clientHealth{client} +} + +type clientHealth struct { + client *health.Client +} + +func (h *clientHealth) Notify( + ctx context.Context, + req *structs.ServiceSpecificRequest, + correlationID string, + ch chan<- proxycfg.UpdateEvent, +) error { + return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ch)) +} + +// ServerHealth satisfies the proxycfg.Health interface by sourcing data from +// a local materialized view (backed by an EventPublisher subscription). +// +// Requests for services in remote datacenters will be delegated to the given +// remoteSource (i.e. ClientHealth). +func ServerHealth(deps ServerDataSourceDeps, remoteSource proxycfg.Health) proxycfg.Health { + return &serverHealth{deps, remoteSource} +} + +type serverHealth struct { + deps ServerDataSourceDeps + remoteSource proxycfg.Health +} + +func (h *serverHealth) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + if req.Datacenter != h.deps.Datacenter { + return h.remoteSource.Notify(ctx, req, correlationID, ch) + } + + return h.deps.ViewStore.NotifyCallback( + ctx, + &healthRequest{h.deps, *req}, + correlationID, + dispatchCacheUpdate(ch), + ) +} + +type healthRequest struct { + deps ServerDataSourceDeps + req structs.ServiceSpecificRequest +} + +func (r *healthRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() } + +func (r *healthRequest) NewMaterializer() (submatview.Materializer, error) { + view, err := health.NewHealthView(r.req) + if err != nil { + return nil, err + } + return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{ + Backend: r.deps.EventPublisher, + ACLResolver: r.deps.ACLResolver, + Deps: submatview.Deps{ + View: view, + Logger: r.deps.Logger, + Request: health.NewMaterializerRequest(r.req), + }, + }), nil +} + +func (r *healthRequest) Type() string { return "proxycfgglue.Health" } diff --git a/agent/proxycfg-glue/health_test.go b/agent/proxycfg-glue/health_test.go new file mode 100644 index 0000000000..b4e6035ee5 --- /dev/null +++ b/agent/proxycfg-glue/health_test.go @@ -0,0 +1,149 @@ +package proxycfgglue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" + "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestServerHealth(t *testing.T) { + t.Run("remote queries are delegated to the remote source", func(t *testing.T) { + var ( + ctx = context.Background() + req = &structs.ServiceSpecificRequest{Datacenter: "dc2"} + correlationID = "correlation-id" + ch = make(chan<- proxycfg.UpdateEvent) + result = errors.New("KABOOM") + ) + + remoteSource := newMockHealth(t) + remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result) + + dataSource := ServerHealth(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource) + err := dataSource.Notify(ctx, req, correlationID, ch) + require.Equal(t, result, err) + }) + + t.Run("local queries are served from a materialized view", func(t *testing.T) { + // Note: the view is tested more thoroughly in the agent/rpcclient/health + // package, so this is more of a high-level integration test with the local + // materializer. + const ( + index uint64 = 123 + datacenter = "dc1" + serviceName = "web" + ) + + logger := testutil.Logger(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := submatview.NewStore(logger) + go store.Run(ctx) + + publisher := stream.NewEventPublisher(10 * time.Second) + publisher.RegisterHandler(pbsubscribe.Topic_ServiceHealth, + func(stream.SubscribeRequest, stream.SnapshotAppender) (uint64, error) { return index, nil }, + true) + go publisher.Run(ctx) + + dataSource := ServerHealth(ServerDataSourceDeps{ + Datacenter: datacenter, + ACLResolver: newStaticResolver(acl.ManageAll()), + ViewStore: store, + EventPublisher: publisher, + Logger: logger, + }, nil) + + eventCh := make(chan proxycfg.UpdateEvent) + require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ + Datacenter: datacenter, + ServiceName: serviceName, + }, "", eventCh)) + + testutil.RunStep(t, "initial state", func(t *testing.T) { + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Empty(t, result.Nodes) + }) + + testutil.RunStep(t, "register services", func(t *testing.T) { + publisher.Publish([]stream.Event{ + { + Index: index + 1, + Topic: pbsubscribe.Topic_ServiceHealth, + Payload: &state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + Service: &structs.NodeService{Service: serviceName}, + }, + }, + }, + { + Index: index + 1, + Topic: pbsubscribe.Topic_ServiceHealth, + Payload: &state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node2"}, + Service: &structs.NodeService{Service: serviceName}, + }, + }, + }, + }) + + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Len(t, result.Nodes, 2) + }) + + testutil.RunStep(t, "deregister service", func(t *testing.T) { + publisher.Publish([]stream.Event{ + { + Index: index + 2, + Topic: pbsubscribe.Topic_ServiceHealth, + Payload: &state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node2"}, + Service: &structs.NodeService{Service: serviceName}, + }, + }, + }, + }) + + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Len(t, result.Nodes, 1) + }) + }) +} + +func newMockHealth(t *testing.T) *mockHealth { + mock := &mockHealth{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +type mockHealth struct { + mock.Mock +} + +func (m *mockHealth) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + return m.Called(ctx, req, correlationID, ch).Error(0) +} diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index dd4be64cef..a4bdae78a9 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -136,14 +136,14 @@ func (r serviceRequest) Type() string { } func (r serviceRequest) NewMaterializer() (submatview.Materializer, error) { - view, err := newHealthView(r.ServiceSpecificRequest) + view, err := NewHealthView(r.ServiceSpecificRequest) if err != nil { return nil, err } deps := submatview.Deps{ View: view, Logger: r.deps.Logger, - Request: newMaterializerRequest(r.ServiceSpecificRequest), + Request: NewMaterializerRequest(r.ServiceSpecificRequest), } return submatview.NewRPCMaterializer(pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), deps), nil diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index fa591b7b72..fd19cb4a00 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -21,7 +21,7 @@ type MaterializerDeps struct { Logger hclog.Logger } -func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest { +func NewMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest { return func(index uint64) *pbsubscribe.SubscribeRequest { req := &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, @@ -44,29 +44,29 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui } } -func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) { +func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { fe, err := newFilterEvaluator(req) if err != nil { return nil, err } - return &healthView{ + return &HealthView{ state: make(map[string]structs.CheckServiceNode), filter: fe, }, nil } -// healthView implements submatview.View for storing the view state +// HealthView implements submatview.View for storing the view state // of a service health result. We store it as a map to make updates and // deletions a little easier but we could just store a result type // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. -type healthView struct { +type HealthView struct { state map[string]structs.CheckServiceNode filter filterEvaluator } // Update implements View -func (s *healthView) Update(events []*pbsubscribe.Event) error { +func (s *HealthView) Update(events []*pbsubscribe.Event) error { for _, event := range events { serviceHealth := event.GetServiceHealth() if serviceHealth == nil { @@ -181,7 +181,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) { } // Result returns the structs.IndexedCheckServiceNodes stored by this view. -func (s *healthView) Result(index uint64) interface{} { +func (s *HealthView) Result(index uint64) interface{} { result := structs.IndexedCheckServiceNodes{ Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), QueryMeta: structs.QueryMeta{ @@ -197,7 +197,7 @@ func (s *healthView) Result(index uint64) interface{} { return &result } -func (s *healthView) Reset() { +func (s *HealthView) Reset() { s.state = make(map[string]structs.CheckServiceNode) } diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index ddc2afc1aa..8fcb50da33 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -602,14 +602,14 @@ type serviceRequestStub struct { } func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) { - view, err := newHealthView(r.ServiceSpecificRequest) + view, err := NewHealthView(r.ServiceSpecificRequest) if err != nil { return nil, err } deps := submatview.Deps{ View: view, Logger: hclog.New(nil), - Request: newMaterializerRequest(r.ServiceSpecificRequest), + Request: NewMaterializerRequest(r.ServiceSpecificRequest), } return submatview.NewRPCMaterializer(r.streamClient, deps), nil }