mirror of https://github.com/status-im/consul.git
proxycfg-glue: server-local implementation of the `Health` interface
This is the OSS portion of enterprise PR 2249. This PR introduces an implementation of the proxycfg.Health interface based on a local materialized view of the health events. It reuses the view and request machinery from agent/rpcclient/health, which made it super straightforward.
This commit is contained in:
parent
3c533ceea8
commit
673d02d30f
|
@ -4220,7 +4220,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||
Datacenters: proxycfgglue.CacheDatacenters(a.cache),
|
||||
FederationStateListMeshGateways: proxycfgglue.CacheFederationStateListMeshGateways(a.cache),
|
||||
GatewayServices: proxycfgglue.CacheGatewayServices(a.cache),
|
||||
Health: proxycfgglue.Health(a.rpcClientHealth),
|
||||
Health: proxycfgglue.ClientHealth(a.rpcClientHealth),
|
||||
HTTPChecks: proxycfgglue.CacheHTTPChecks(a.cache),
|
||||
Intentions: proxycfgglue.CacheIntentions(a.cache),
|
||||
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
|
||||
|
@ -4247,6 +4247,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
|
||||
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
|
||||
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
|
||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/consul/watch"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
@ -139,25 +138,6 @@ func (c *cacheProxyDataSource[ReqType]) Notify(
|
|||
return c.c.NotifyCallback(ctx, c.t, req, correlationID, dispatchCacheUpdate(ch))
|
||||
}
|
||||
|
||||
// Health wraps health.Client so that the proxycfg package doesn't need to
|
||||
// reference cache.UpdateEvent directly.
|
||||
func Health(client *health.Client) proxycfg.Health {
|
||||
return &healthWrapper{client}
|
||||
}
|
||||
|
||||
type healthWrapper struct {
|
||||
client *health.Client
|
||||
}
|
||||
|
||||
func (h *healthWrapper) Notify(
|
||||
ctx context.Context,
|
||||
req *structs.ServiceSpecificRequest,
|
||||
correlationID string,
|
||||
ch chan<- proxycfg.UpdateEvent,
|
||||
) error {
|
||||
return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ch))
|
||||
}
|
||||
|
||||
func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback {
|
||||
return func(ctx context.Context, e cache.UpdateEvent) {
|
||||
u := proxycfg.UpdateEvent{
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
package proxycfgglue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
)
|
||||
|
||||
// ClientHealth satisfies the proxycfg.Health interface by sourcing data from
|
||||
// the given health.Client.
|
||||
func ClientHealth(client *health.Client) proxycfg.Health {
|
||||
return &clientHealth{client}
|
||||
}
|
||||
|
||||
type clientHealth struct {
|
||||
client *health.Client
|
||||
}
|
||||
|
||||
func (h *clientHealth) Notify(
|
||||
ctx context.Context,
|
||||
req *structs.ServiceSpecificRequest,
|
||||
correlationID string,
|
||||
ch chan<- proxycfg.UpdateEvent,
|
||||
) error {
|
||||
return h.client.Notify(ctx, *req, correlationID, dispatchCacheUpdate(ch))
|
||||
}
|
||||
|
||||
// ServerHealth satisfies the proxycfg.Health interface by sourcing data from
|
||||
// a local materialized view (backed by an EventPublisher subscription).
|
||||
//
|
||||
// Requests for services in remote datacenters will be delegated to the given
|
||||
// remoteSource (i.e. ClientHealth).
|
||||
func ServerHealth(deps ServerDataSourceDeps, remoteSource proxycfg.Health) proxycfg.Health {
|
||||
return &serverHealth{deps, remoteSource}
|
||||
}
|
||||
|
||||
type serverHealth struct {
|
||||
deps ServerDataSourceDeps
|
||||
remoteSource proxycfg.Health
|
||||
}
|
||||
|
||||
func (h *serverHealth) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||
if req.Datacenter != h.deps.Datacenter {
|
||||
return h.remoteSource.Notify(ctx, req, correlationID, ch)
|
||||
}
|
||||
|
||||
return h.deps.ViewStore.NotifyCallback(
|
||||
ctx,
|
||||
&healthRequest{h.deps, *req},
|
||||
correlationID,
|
||||
dispatchCacheUpdate(ch),
|
||||
)
|
||||
}
|
||||
|
||||
type healthRequest struct {
|
||||
deps ServerDataSourceDeps
|
||||
req structs.ServiceSpecificRequest
|
||||
}
|
||||
|
||||
func (r *healthRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() }
|
||||
|
||||
func (r *healthRequest) NewMaterializer() (submatview.Materializer, error) {
|
||||
view, err := health.NewHealthView(r.req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
|
||||
Backend: r.deps.EventPublisher,
|
||||
ACLResolver: r.deps.ACLResolver,
|
||||
Deps: submatview.Deps{
|
||||
View: view,
|
||||
Logger: r.deps.Logger,
|
||||
Request: health.NewMaterializerRequest(r.req),
|
||||
},
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (r *healthRequest) Type() string { return "proxycfgglue.Health" }
|
|
@ -0,0 +1,149 @@
|
|||
package proxycfgglue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestServerHealth(t *testing.T) {
|
||||
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
|
||||
correlationID = "correlation-id"
|
||||
ch = make(chan<- proxycfg.UpdateEvent)
|
||||
result = errors.New("KABOOM")
|
||||
)
|
||||
|
||||
remoteSource := newMockHealth(t)
|
||||
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
||||
|
||||
dataSource := ServerHealth(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
|
||||
err := dataSource.Notify(ctx, req, correlationID, ch)
|
||||
require.Equal(t, result, err)
|
||||
})
|
||||
|
||||
t.Run("local queries are served from a materialized view", func(t *testing.T) {
|
||||
// Note: the view is tested more thoroughly in the agent/rpcclient/health
|
||||
// package, so this is more of a high-level integration test with the local
|
||||
// materializer.
|
||||
const (
|
||||
index uint64 = 123
|
||||
datacenter = "dc1"
|
||||
serviceName = "web"
|
||||
)
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
store := submatview.NewStore(logger)
|
||||
go store.Run(ctx)
|
||||
|
||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||
publisher.RegisterHandler(pbsubscribe.Topic_ServiceHealth,
|
||||
func(stream.SubscribeRequest, stream.SnapshotAppender) (uint64, error) { return index, nil },
|
||||
true)
|
||||
go publisher.Run(ctx)
|
||||
|
||||
dataSource := ServerHealth(ServerDataSourceDeps{
|
||||
Datacenter: datacenter,
|
||||
ACLResolver: newStaticResolver(acl.ManageAll()),
|
||||
ViewStore: store,
|
||||
EventPublisher: publisher,
|
||||
Logger: logger,
|
||||
}, nil)
|
||||
|
||||
eventCh := make(chan proxycfg.UpdateEvent)
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
Datacenter: datacenter,
|
||||
ServiceName: serviceName,
|
||||
}, "", eventCh))
|
||||
|
||||
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Empty(t, result.Nodes)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "register services", func(t *testing.T) {
|
||||
publisher.Publish([]stream.Event{
|
||||
{
|
||||
Index: index + 1,
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: &state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node1"},
|
||||
Service: &structs.NodeService{Service: serviceName},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Index: index + 1,
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: &state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node2"},
|
||||
Service: &structs.NodeService{Service: serviceName},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Len(t, result.Nodes, 2)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deregister service", func(t *testing.T) {
|
||||
publisher.Publish([]stream.Event{
|
||||
{
|
||||
Index: index + 2,
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Payload: &state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Deregister,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node2"},
|
||||
Service: &structs.NodeService{Service: serviceName},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func newMockHealth(t *testing.T) *mockHealth {
|
||||
mock := &mockHealth{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
type mockHealth struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockHealth) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||
return m.Called(ctx, req, correlationID, ch).Error(0)
|
||||
}
|
|
@ -136,14 +136,14 @@ func (r serviceRequest) Type() string {
|
|||
}
|
||||
|
||||
func (r serviceRequest) NewMaterializer() (submatview.Materializer, error) {
|
||||
view, err := newHealthView(r.ServiceSpecificRequest)
|
||||
view, err := NewHealthView(r.ServiceSpecificRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deps := submatview.Deps{
|
||||
View: view,
|
||||
Logger: r.deps.Logger,
|
||||
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
||||
Request: NewMaterializerRequest(r.ServiceSpecificRequest),
|
||||
}
|
||||
|
||||
return submatview.NewRPCMaterializer(pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), deps), nil
|
||||
|
|
|
@ -21,7 +21,7 @@ type MaterializerDeps struct {
|
|||
Logger hclog.Logger
|
||||
}
|
||||
|
||||
func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest {
|
||||
func NewMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest {
|
||||
return func(index uint64) *pbsubscribe.SubscribeRequest {
|
||||
req := &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
|
@ -44,29 +44,29 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui
|
|||
}
|
||||
}
|
||||
|
||||
func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
|
||||
func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
|
||||
fe, err := newFilterEvaluator(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &healthView{
|
||||
return &HealthView{
|
||||
state: make(map[string]structs.CheckServiceNode),
|
||||
filter: fe,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// healthView implements submatview.View for storing the view state
|
||||
// HealthView implements submatview.View for storing the view state
|
||||
// of a service health result. We store it as a map to make updates and
|
||||
// deletions a little easier but we could just store a result type
|
||||
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
||||
// involves re-sorting each time etc. though.
|
||||
type healthView struct {
|
||||
type HealthView struct {
|
||||
state map[string]structs.CheckServiceNode
|
||||
filter filterEvaluator
|
||||
}
|
||||
|
||||
// Update implements View
|
||||
func (s *healthView) Update(events []*pbsubscribe.Event) error {
|
||||
func (s *HealthView) Update(events []*pbsubscribe.Event) error {
|
||||
for _, event := range events {
|
||||
serviceHealth := event.GetServiceHealth()
|
||||
if serviceHealth == nil {
|
||||
|
@ -181,7 +181,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
|
|||
}
|
||||
|
||||
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
|
||||
func (s *healthView) Result(index uint64) interface{} {
|
||||
func (s *HealthView) Result(index uint64) interface{} {
|
||||
result := structs.IndexedCheckServiceNodes{
|
||||
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
|
||||
QueryMeta: structs.QueryMeta{
|
||||
|
@ -197,7 +197,7 @@ func (s *healthView) Result(index uint64) interface{} {
|
|||
return &result
|
||||
}
|
||||
|
||||
func (s *healthView) Reset() {
|
||||
func (s *HealthView) Reset() {
|
||||
s.state = make(map[string]structs.CheckServiceNode)
|
||||
}
|
||||
|
||||
|
|
|
@ -602,14 +602,14 @@ type serviceRequestStub struct {
|
|||
}
|
||||
|
||||
func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) {
|
||||
view, err := newHealthView(r.ServiceSpecificRequest)
|
||||
view, err := NewHealthView(r.ServiceSpecificRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deps := submatview.Deps{
|
||||
View: view,
|
||||
Logger: hclog.New(nil),
|
||||
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
||||
Request: NewMaterializerRequest(r.ServiceSpecificRequest),
|
||||
}
|
||||
return submatview.NewRPCMaterializer(r.streamClient, deps), nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue