mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
ac83ac1343
* Fix streaming RPCs for agentless. This PR fixes an issue where cross-dc RPCs were unable to utilize the streaming backend due to having the node name set. The result of this was the agent-cache being utilized, which would cause high cpu utilization and memory consumption due to the fact that it keeps queries alive for 72 hours before purging inactive entries. This resource consumption is compounded by the fact that each pod in consul-k8s gets a unique token. Since the agent-cache uses the token as a component of the key, the same query is duplicated for each pod that is deployed. * Add changelog.
146 lines
4.2 KiB
Go
146 lines
4.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package health
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/hashicorp/consul/agent/rpcclient"
|
|
"google.golang.org/grpc/connectivity"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// Client provides access to service health data.
|
|
type Client struct {
|
|
rpcclient.Client
|
|
}
|
|
|
|
// IsReadyForStreaming will indicate if the underlying gRPC connection is ready.
|
|
func (c *Client) IsReadyForStreaming() bool {
|
|
conn := c.MaterializerDeps.Conn
|
|
if conn == nil {
|
|
return false
|
|
}
|
|
|
|
return conn.GetState() == connectivity.Ready
|
|
}
|
|
|
|
func (c *Client) ServiceNodes(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
|
// Note: if MergeCentralConfig is requested, default to using the RPC backend for now
|
|
// as the streaming backend and materializer does not have support for merging yet.
|
|
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) && !req.MergeCentralConfig {
|
|
c.QueryOptionDefaults(&req.QueryOptions)
|
|
|
|
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
|
|
if err != nil {
|
|
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
|
|
}
|
|
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
|
|
return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err
|
|
}
|
|
|
|
out, md, err := c.getServiceNodes(ctx, req)
|
|
if err != nil {
|
|
return out, md, err
|
|
}
|
|
|
|
// TODO: DNSServer emitted a metric here, do we still need it?
|
|
if req.QueryOptions.AllowStale && req.QueryOptions.MaxStaleDuration > 0 && out.QueryMeta.LastContact > req.MaxStaleDuration {
|
|
req.AllowStale = false
|
|
err := c.NetRPC.RPC(context.Background(), "Health.ServiceNodes", &req, &out)
|
|
return out, cache.ResultMeta{}, err
|
|
}
|
|
|
|
return out, md, err
|
|
}
|
|
|
|
func (c *Client) getServiceNodes(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
|
var out structs.IndexedCheckServiceNodes
|
|
if !req.QueryOptions.UseCache {
|
|
err := c.NetRPC.RPC(context.Background(), "Health.ServiceNodes", &req, &out)
|
|
return out, cache.ResultMeta{}, err
|
|
}
|
|
|
|
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
|
if err != nil {
|
|
return out, md, err
|
|
}
|
|
|
|
value, ok := raw.(*structs.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
panic("wrong response type for cachetype.HealthServicesName")
|
|
}
|
|
|
|
return *value, md, nil
|
|
}
|
|
|
|
func (c *Client) Notify(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
correlationID string,
|
|
cb cache.Callback,
|
|
) error {
|
|
if c.useStreaming(req) {
|
|
sr := c.newServiceRequest(req)
|
|
return c.ViewStore.NotifyCallback(ctx, sr, correlationID, cb)
|
|
}
|
|
|
|
return c.Cache.NotifyCallback(ctx, c.CacheName, &req, correlationID, cb)
|
|
}
|
|
|
|
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
|
|
return c.UseStreamingBackend &&
|
|
!req.Ingress &&
|
|
// Streaming is incompatible with NearestN queries (due to lack of ordering),
|
|
// so we can only use it if the NearestN would never work (Node == "")
|
|
// or if we explicitly say to ignore the Node field for queries (agentless xDS).
|
|
(req.Source.Node == "" || req.Source.DisableNode)
|
|
}
|
|
|
|
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
|
|
return serviceRequest{
|
|
ServiceSpecificRequest: req,
|
|
deps: c.MaterializerDeps,
|
|
}
|
|
}
|
|
|
|
var _ submatview.Request = (*serviceRequest)(nil)
|
|
|
|
type serviceRequest struct {
|
|
structs.ServiceSpecificRequest
|
|
deps rpcclient.MaterializerDeps
|
|
}
|
|
|
|
func (r serviceRequest) CacheInfo() cache.RequestInfo {
|
|
return r.ServiceSpecificRequest.CacheInfo()
|
|
}
|
|
|
|
func (r serviceRequest) Type() string {
|
|
return "agent.rpcclient.health.serviceRequest"
|
|
}
|
|
|
|
func (r serviceRequest) NewMaterializer() (submatview.Materializer, error) {
|
|
view, err := NewHealthView(r.ServiceSpecificRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
deps := submatview.Deps{
|
|
View: view,
|
|
Logger: r.deps.Logger,
|
|
Request: NewMaterializerRequest(r.ServiceSpecificRequest),
|
|
}
|
|
|
|
return submatview.NewRPCMaterializer(pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), deps), nil
|
|
}
|