mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
653b8c4f9d
This is the OSS portion of enterprise PR 2056. This commit provides server-local implementations of the proxycfg.ConfigEntry and proxycfg.ConfigEntryList interfaces, that source data from streaming events. It makes use of the LocalMaterializer type introduced for peering replication, adding the necessary support for authorization. It also adds support for "wildcard" subscriptions (within a topic) to the event publisher, as this is needed to fetch service-resolvers for all services when configuring mesh gateways. Currently, events will be emitted for just the ingress-gateway, service-resolver, and mesh config entry types, as these are the only entries required by proxycfg — the events will be emitted on topics named IngressGateway, ServiceResolver, and MeshConfig topics respectively. Though these events will only be consumed "locally" for now, they can also be consumed via the gRPC endpoint (confirmed using grpcurl) so using them from client agents should be a case of swapping the LocalMaterializer for an RPCMaterializer.
234 lines
6.2 KiB
Go
234 lines
6.2 KiB
Go
package health
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/hashicorp/go-bexpr"
|
|
"github.com/hashicorp/go-hclog"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbservice"
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
)
|
|
|
|
type MaterializerDeps struct {
|
|
Conn *grpc.ClientConn
|
|
Logger hclog.Logger
|
|
}
|
|
|
|
func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest {
|
|
return func(index uint64) *pbsubscribe.SubscribeRequest {
|
|
req := &pbsubscribe.SubscribeRequest{
|
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
|
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
|
|
NamedSubject: &pbsubscribe.NamedSubject{
|
|
Key: srvReq.ServiceName,
|
|
Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(),
|
|
Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(),
|
|
PeerName: srvReq.PeerName,
|
|
},
|
|
},
|
|
Token: srvReq.Token,
|
|
Datacenter: srvReq.Datacenter,
|
|
Index: index,
|
|
}
|
|
if srvReq.Connect {
|
|
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
|
}
|
|
return req
|
|
}
|
|
}
|
|
|
|
func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
|
|
fe, err := newFilterEvaluator(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &healthView{
|
|
state: make(map[string]structs.CheckServiceNode),
|
|
filter: fe,
|
|
}, nil
|
|
}
|
|
|
|
// healthView implements submatview.View for storing the view state
|
|
// of a service health result. We store it as a map to make updates and
|
|
// deletions a little easier but we could just store a result type
|
|
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
|
// involves re-sorting each time etc. though.
|
|
type healthView struct {
|
|
state map[string]structs.CheckServiceNode
|
|
filter filterEvaluator
|
|
}
|
|
|
|
// Update implements View
|
|
func (s *healthView) Update(events []*pbsubscribe.Event) error {
|
|
for _, event := range events {
|
|
serviceHealth := event.GetServiceHealth()
|
|
if serviceHealth == nil {
|
|
return fmt.Errorf("unexpected event type for service health view: %T",
|
|
event.GetPayload())
|
|
}
|
|
|
|
id := serviceHealth.CheckServiceNode.UniqueID()
|
|
switch serviceHealth.Op {
|
|
case pbsubscribe.CatalogOp_Register:
|
|
csn, err := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if csn == nil {
|
|
return errors.New("check service node was unexpectedly nil")
|
|
}
|
|
passed, err := s.filter.Evaluate(*csn)
|
|
if err != nil {
|
|
return err
|
|
} else if passed {
|
|
s.state[id] = *csn
|
|
} else {
|
|
delete(s.state, id)
|
|
}
|
|
|
|
case pbsubscribe.CatalogOp_Deregister:
|
|
delete(s.state, id)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type filterEvaluator interface {
|
|
Evaluate(datum interface{}) (bool, error)
|
|
}
|
|
|
|
func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
|
|
var evaluators []filterEvaluator
|
|
|
|
typ := reflect.TypeOf(structs.CheckServiceNode{})
|
|
if req.Filter != "" {
|
|
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
evaluators = append(evaluators, e)
|
|
}
|
|
|
|
if req.ServiceTag != "" {
|
|
// Handle backwards compat with old field
|
|
req.ServiceTags = []string{req.ServiceTag}
|
|
}
|
|
|
|
if req.TagFilter && len(req.ServiceTags) > 0 {
|
|
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
|
|
}
|
|
|
|
for key, value := range req.NodeMetaFilters {
|
|
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
|
|
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
evaluators = append(evaluators, e)
|
|
}
|
|
|
|
switch len(evaluators) {
|
|
case 0:
|
|
return noopFilterEvaluator{}, nil
|
|
case 1:
|
|
return evaluators[0], nil
|
|
default:
|
|
return &multiFilterEvaluator{evaluators: evaluators}, nil
|
|
}
|
|
}
|
|
|
|
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
|
|
// method always return true, so no items will be filtered out.
|
|
type noopFilterEvaluator struct{}
|
|
|
|
func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
|
|
return true, nil
|
|
}
|
|
|
|
type multiFilterEvaluator struct {
|
|
evaluators []filterEvaluator
|
|
}
|
|
|
|
func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
|
|
for _, e := range m.evaluators {
|
|
match, err := e.Evaluate(data)
|
|
if !match || err != nil {
|
|
return match, err
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// sortCheckServiceNodes sorts the results to match memdb semantics
|
|
// Sort results by Node.Node, if 2 instances match, order by Service.ID
|
|
// Will allow result to be stable sorted and match queries without cache
|
|
func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
|
|
sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool {
|
|
left := serviceNodes.Nodes[i]
|
|
right := serviceNodes.Nodes[j]
|
|
if left.Node.Node == right.Node.Node {
|
|
return left.Service.ID < right.Service.ID
|
|
}
|
|
return left.Node.Node < right.Node.Node
|
|
})
|
|
}
|
|
|
|
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
|
|
func (s *healthView) Result(index uint64) interface{} {
|
|
result := structs.IndexedCheckServiceNodes{
|
|
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
|
|
QueryMeta: structs.QueryMeta{
|
|
Index: index,
|
|
Backend: structs.QueryBackendStreaming,
|
|
},
|
|
}
|
|
for _, node := range s.state {
|
|
result.Nodes = append(result.Nodes, node)
|
|
}
|
|
sortCheckServiceNodes(&result)
|
|
|
|
return &result
|
|
}
|
|
|
|
func (s *healthView) Reset() {
|
|
s.state = make(map[string]structs.CheckServiceNode)
|
|
}
|
|
|
|
// serviceTagEvaluator implements the filterEvaluator to perform filtering
|
|
// by service tags. bexpr can not be used at this time, because the filtering
|
|
// must be case insensitive for backwards compatibility. In the future this
|
|
// may be replaced with bexpr once case insensitive support is added.
|
|
type serviceTagEvaluator struct {
|
|
tags []string
|
|
}
|
|
|
|
func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
|
|
csn, ok := data.(structs.CheckServiceNode)
|
|
if !ok {
|
|
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
|
|
}
|
|
for _, tag := range m.tags {
|
|
if !serviceHasTag(csn.Service, tag) {
|
|
// If any one of the expected tags was not found, filter the service
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func serviceHasTag(sn *structs.NodeService, tag string) bool {
|
|
for _, t := range sn.Tags {
|
|
if strings.EqualFold(t, tag) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|