Fix streaming RPCs for agentless. (#20868)

* Fix streaming RPCs for agentless.

This PR fixes an issue where cross-dc RPCs were unable to utilize
the streaming backend due to having the node name set. The result
of this was the agent-cache being utilized, which would cause high
cpu utilization and memory consumption due to the fact that it
keeps queries alive for 72 hours before purging inactive entries.

This resource consumption is compounded by the fact that each pod
in consul-k8s gets a unique token. Since the agent-cache uses the
token as a component of the key, the same query is duplicated for
each pod that is deployed.

* Add changelog.
This commit is contained in:
Derek Menteer 2024-03-15 14:44:51 -05:00 committed by GitHub
parent 0ac8ae6c3b
commit ac83ac1343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 15 additions and 2 deletions

3
.changelog/20868.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where Consul-dataplane xDS sessions would not utilize the streaming backend for wan-federated queries.
```

View File

@ -833,6 +833,7 @@ func (a *Agent) Start(ctx context.Context) error {
Segment: a.config.SegmentName, Segment: a.config.SegmentName,
Node: a.config.NodeName, Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(), NodePartition: a.config.PartitionOrEmpty(),
DisableNode: true, // Disable for agentless so that streaming RPCs can be used.
}, },
DNSConfig: proxycfg.DNSConfig{ DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain, Domain: a.config.DNSDomain,

View File

@ -100,7 +100,12 @@ func (c *Client) Notify(
} }
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == "" return c.UseStreamingBackend &&
!req.Ingress &&
// Streaming is incompatible with NearestN queries (due to lack of ordering),
// so we can only use it if the NearestN would never work (Node == "")
// or if we explicitly say to ignore the Node field for queries (agentless xDS).
(req.Source.Node == "" || req.Source.DisableNode)
} }
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest { func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {

View File

@ -582,7 +582,11 @@ type QuerySource struct {
Segment string Segment string
Node string Node string
NodePartition string `json:",omitempty"` NodePartition string `json:",omitempty"`
Ip string // DisableNode indicates that the Node and NodePartition fields should not be used
// for determining the flow of the RPC. This is needed for agentless + wanfed to
// utilize streaming RPCs.
DisableNode bool `json:",omitempty"`
Ip string
} }
func (s QuerySource) NodeEnterpriseMeta() *acl.EnterpriseMeta { func (s QuerySource) NodeEnterpriseMeta() *acl.EnterpriseMeta {