250 lines
6.7 KiB
Go
Raw Normal View History

package health
import (
"errors"
"fmt"
"reflect"
"sort"
"strings"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/structs"
Protobuf Refactoring for Multi-Module Cleanliness (#16302) Protobuf Refactoring for Multi-Module Cleanliness This commit includes the following: Moves all packages that were within proto/ to proto/private Rewrites imports to account for the packages being moved Adds in buf.work.yaml to enable buf workspaces Names the proto-public buf module so that we can override the Go package imports within proto/buf.yaml Bumps the buf version dependency to 1.14.0 (I was trying out the version to see if it would get around an issue - it didn't but it also doesn't break things and it seemed best to keep up with the toolchain changes) Why: In the future we will need to consume other protobuf dependencies such as the Google HTTP annotations for openapi generation or grpc-gateway usage. There were some recent changes to have our own ratelimiting annotations. The two combined were not working when I was trying to use them together (attempting to rebase another branch) Buf workspaces should be the solution to the problem Buf workspaces means that each module will have generated Go code that embeds proto file names relative to the proto dir and not the top level repo root. This resulted in proto file name conflicts in the Go global protobuf type registry. The solution to that was to add in a private/ directory into the path within the proto/ directory. That then required rewriting all the imports. Is this safe? AFAICT yes The gRPC wire protocol doesn't seem to care about the proto file names (although the Go grpc code does tack on the proto file name as Metadata in the ServiceDesc) Other than imports, there were no changes to any generated code as a result of this.
2023-02-17 16:14:46 -05:00
"github.com/hashicorp/consul/proto/private/pbservice"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
type MaterializerDeps struct {
Conn *grpc.ClientConn
Logger hclog.Logger
}
func NewMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest {
return func(index uint64) *pbsubscribe.SubscribeRequest {
req := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: srvReq.ServiceName,
Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(),
Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(),
PeerName: srvReq.PeerName,
},
},
Token: srvReq.Token,
Datacenter: srvReq.Datacenter,
Index: index,
}
if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
}
return req
}
}
func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
fe, err := newFilterEvaluator(req)
if err != nil {
return nil, err
}
return &HealthView{
state: make(map[string]structs.CheckServiceNode),
filter: fe,
connect: req.Connect,
kind: req.ServiceKind,
}, nil
}
// HealthView implements submatview.View for storing the view state
// of a service health result. We store it as a map to make updates and
// deletions a little easier but we could just store a result type
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type HealthView struct {
connect bool
kind structs.ServiceKind
state map[string]structs.CheckServiceNode
filter filterEvaluator
}
// Update implements View
func (s *HealthView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
id := serviceHealth.CheckServiceNode.UniqueID()
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
csn, err := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
if err != nil {
return err
}
if csn == nil {
return errors.New("check service node was unexpectedly nil")
}
// check if we intentionally need to skip the filter
if s.skipFilter(csn) {
s.state[id] = *csn
continue
}
passed, err := s.filter.Evaluate(*csn)
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12640) The primary bug here is in the streaming subsystem that makes the overall v1/health/service/:service request behave incorrectly when servicing a blocking request with a filter provided. There is a secondary non-streaming bug being fixed here that is much less obvious related to when to update the `reply` variable in a `blockingQuery` evaluation. It is unlikely that it is triggerable in practical environments and I could not actually get the bug to manifest, but I fixed it anyway while investigating the original issue. Simple reproduction (streaming): 1. Register a service with a tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "a" ], "EnableTagOverride": true }' 2. Do an initial filter query that matches on the tag. curl -sLi --get 'http://localhost:8500/v1/health/service/test' --data-urlencode 'filter=a in Service.Tags' 3. Note you get one result. Use the `X-Consul-Index` header to establish a blocking query in another terminal, this should not return yet. curl -sLi --get 'http://localhost:8500/v1/health/service/test?index=$INDEX' --data-urlencode 'filter=a in Service.Tags' 4. Re-register that service with a different tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "b" ], "EnableTagOverride": true }' 5. Your blocking query from (3) should return with a header `X-Consul-Query-Backend: streaming` and empty results if it works correctly `[]`. Attempts to reproduce with non-streaming failed (where you add `&near=_agent` to the read queries and ensure `X-Consul-Query-Backend: blocking-query` shows up in the results).
2022-04-27 10:39:45 -05:00
if err != nil {
return err
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12640) The primary bug here is in the streaming subsystem that makes the overall v1/health/service/:service request behave incorrectly when servicing a blocking request with a filter provided. There is a secondary non-streaming bug being fixed here that is much less obvious related to when to update the `reply` variable in a `blockingQuery` evaluation. It is unlikely that it is triggerable in practical environments and I could not actually get the bug to manifest, but I fixed it anyway while investigating the original issue. Simple reproduction (streaming): 1. Register a service with a tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "a" ], "EnableTagOverride": true }' 2. Do an initial filter query that matches on the tag. curl -sLi --get 'http://localhost:8500/v1/health/service/test' --data-urlencode 'filter=a in Service.Tags' 3. Note you get one result. Use the `X-Consul-Index` header to establish a blocking query in another terminal, this should not return yet. curl -sLi --get 'http://localhost:8500/v1/health/service/test?index=$INDEX' --data-urlencode 'filter=a in Service.Tags' 4. Re-register that service with a different tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "b" ], "EnableTagOverride": true }' 5. Your blocking query from (3) should return with a header `X-Consul-Query-Backend: streaming` and empty results if it works correctly `[]`. Attempts to reproduce with non-streaming failed (where you add `&near=_agent` to the read queries and ensure `X-Consul-Query-Backend: blocking-query` shows up in the results).
2022-04-27 10:39:45 -05:00
} else if passed {
s.state[id] = *csn
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12640) The primary bug here is in the streaming subsystem that makes the overall v1/health/service/:service request behave incorrectly when servicing a blocking request with a filter provided. There is a secondary non-streaming bug being fixed here that is much less obvious related to when to update the `reply` variable in a `blockingQuery` evaluation. It is unlikely that it is triggerable in practical environments and I could not actually get the bug to manifest, but I fixed it anyway while investigating the original issue. Simple reproduction (streaming): 1. Register a service with a tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "a" ], "EnableTagOverride": true }' 2. Do an initial filter query that matches on the tag. curl -sLi --get 'http://localhost:8500/v1/health/service/test' --data-urlencode 'filter=a in Service.Tags' 3. Note you get one result. Use the `X-Consul-Index` header to establish a blocking query in another terminal, this should not return yet. curl -sLi --get 'http://localhost:8500/v1/health/service/test?index=$INDEX' --data-urlencode 'filter=a in Service.Tags' 4. Re-register that service with a different tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "b" ], "EnableTagOverride": true }' 5. Your blocking query from (3) should return with a header `X-Consul-Query-Backend: streaming` and empty results if it works correctly `[]`. Attempts to reproduce with non-streaming failed (where you add `&near=_agent` to the read queries and ensure `X-Consul-Query-Backend: blocking-query` shows up in the results).
2022-04-27 10:39:45 -05:00
} else {
delete(s.state, id)
}
case pbsubscribe.CatalogOp_Deregister:
delete(s.state, id)
}
}
return nil
}
func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool {
// we only do this for connect-enabled services that need to be routed through a terminating gateway
return s.kind == "" && s.connect && csn.Service.Kind == structs.ServiceKindTerminatingGateway
}
type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}
func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator
typ := reflect.TypeOf(structs.CheckServiceNode{})
if req.Filter != "" {
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}
if req.ServiceTag != "" {
// Handle backwards compat with old field
req.ServiceTags = []string{req.ServiceTag}
}
if req.TagFilter && len(req.ServiceTags) > 0 {
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
}
for key, value := range req.NodeMetaFilters {
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}
switch len(evaluators) {
case 0:
return noopFilterEvaluator{}, nil
case 1:
return evaluators[0], nil
default:
return &multiFilterEvaluator{evaluators: evaluators}, nil
}
}
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
// method always return true, so no items will be filtered out.
type noopFilterEvaluator struct{}
func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}
type multiFilterEvaluator struct {
evaluators []filterEvaluator
}
func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
for _, e := range m.evaluators {
match, err := e.Evaluate(data)
if !match || err != nil {
return match, err
}
}
return true, nil
}
// sortCheckServiceNodes sorts the results to match memdb semantics
// Sort results by Node.Node, if 2 instances match, order by Service.ID
// Will allow result to be stable sorted and match queries without cache
func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool {
left := serviceNodes.Nodes[i]
right := serviceNodes.Nodes[j]
if left.Node.Node == right.Node.Node {
return left.Service.ID < right.Service.ID
}
return left.Node.Node < right.Node.Node
})
}
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *HealthView) Result(index uint64) interface{} {
result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
for _, node := range s.state {
result.Nodes = append(result.Nodes, node)
}
sortCheckServiceNodes(&result)
return &result
}
func (s *HealthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode)
}
// serviceTagEvaluator implements the filterEvaluator to perform filtering
// by service tags. bexpr can not be used at this time, because the filtering
// must be case insensitive for backwards compatibility. In the future this
// may be replaced with bexpr once case insensitive support is added.
type serviceTagEvaluator struct {
tags []string
}
func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
csn, ok := data.(structs.CheckServiceNode)
if !ok {
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
}
for _, tag := range m.tags {
if !serviceHasTag(csn.Service, tag) {
// If any one of the expected tags was not found, filter the service
return false, nil
}
}
return true, nil
}
func serviceHasTag(sn *structs.NodeService, tag string) bool {
for _, t := range sn.Tags {
if strings.EqualFold(t, tag) {
return true
}
}
return false
}