diff --git a/.changelog/20868.txt b/.changelog/20868.txt new file mode 100644 index 0000000000..172cd12f24 --- /dev/null +++ b/.changelog/20868.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix issue where Consul-dataplane xDS sessions would not utilize the streaming backend for wan-federated queries. +``` diff --git a/agent/agent.go b/agent/agent.go index 98e1252cef..641ca4848c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -833,6 +833,7 @@ func (a *Agent) Start(ctx context.Context) error { Segment: a.config.SegmentName, Node: a.config.NodeName, NodePartition: a.config.PartitionOrEmpty(), + DisableNode: true, // Disable for agentless so that streaming RPCs can be used. }, DNSConfig: proxycfg.DNSConfig{ Domain: a.config.DNSDomain, diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index f062d2aac2..6732b2f88e 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -100,7 +100,12 @@ func (c *Client) Notify( } func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool { - return c.UseStreamingBackend && !req.Ingress && req.Source.Node == "" + return c.UseStreamingBackend && + !req.Ingress && + // Streaming is incompatible with NearestN queries (due to lack of ordering), + // so we can only use it if the NearestN would never work (Node == "") + // or if we explicitly say to ignore the Node field for queries (agentless xDS). + (req.Source.Node == "" || req.Source.DisableNode) } func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index d9fbac2bfe..0ac72b07fe 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -582,7 +582,11 @@ type QuerySource struct { Segment string Node string NodePartition string `json:",omitempty"` - Ip string + // DisableNode indicates that the Node and NodePartition fields should not be used + // for determining the flow of the RPC. This is needed for agentless + wanfed to + // utilize streaming RPCs. + DisableNode bool `json:",omitempty"` + Ip string } func (s QuerySource) NodeEnterpriseMeta() *acl.EnterpriseMeta {