NET-5879 - expose sameness group param on service health endpoint and move sameness group health fallback logic into HealthService RPC layer (#21096)

* NET-5879 - move the filter for non-passing to occur in the health RPC layer rather than the callers of the RPC

* fix import of slices

* NET-5879 - expose sameness group param on service health endpoint and move sameness group health fallback logic into HealthService RPC layer

* fixing deepcopy

* fix license headers
This commit is contained in:
John Murret 2024-05-14 07:32:49 -06:00 committed by GitHub
parent a975b04302
commit 9b2c1be053
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 288 additions and 165 deletions

View File

@ -8,15 +8,15 @@ import (
"sort"
"github.com/armon/go-metrics"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
)
// Health endpoint is used to query the health information
@ -250,69 +250,86 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
func(ws memdb.WatchSet, state *state.Store) error {
var thisReply structs.IndexedCheckServiceNodes
index, nodes, err := f(ws, state, args)
sgIdx, sgArgs, err := h.getArgsForSamenessGroupMembers(args, ws, state)
if err != nil {
return err
}
resolvedNodes := nodes
if args.MergeCentralConfig {
for _, node := range resolvedNodes {
ns := node.Service
if ns.IsSidecarProxy() || ns.IsGateway() {
cfgIndex, mergedns, err := configentry.MergeNodeServiceWithCentralConfig(ws, state, ns, h.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
*node.Service = *mergedns
}
}
// Generate a hash of the resolvedNodes driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil)
for _, arg := range sgArgs {
index, nodes, err := f(ws, state, arg)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
return err
}
resolvedNodes := nodes
if arg.MergeCentralConfig {
for _, node := range resolvedNodes {
ns := node.Service
if ns.IsSidecarProxy() || ns.IsGateway() {
cfgIndex, mergedns, err := configentry.MergeNodeServiceWithCentralConfig(ws, state, ns, h.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
*node.Service = *mergedns
}
}
// Generate a hash of the resolvedNodes driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
thisReply.Index, thisReply.Nodes = index, resolvedNodes
if len(arg.NodeMetaFilters) > 0 {
thisReply.Nodes = nodeMetaFilter(arg.NodeMetaFilters, thisReply.Nodes)
}
raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return err
}
filteredNodes := raw.(structs.CheckServiceNodes)
thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: arg.HealthFilterType})
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.srv.filterACL(arg.Token, &thisReply); err != nil {
return err
}
if err := h.srv.sortNodesByDistanceFrom(arg.Source, thisReply.Nodes); err != nil {
return err
}
if len(thisReply.Nodes) > 0 {
break
}
}
thisReply.Index, thisReply.Nodes = index, resolvedNodes
if len(args.NodeMetaFilters) > 0 {
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
}
raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return err
}
filteredNodes := raw.(structs.CheckServiceNodes)
thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: args.HealthFilterType})
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
return err
}
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
return err
// If sameness group was used, evaluate the index of the sameness group
// and update the index of the response if it is greater. If sameness group is not
// used, the sgIdx will be 0 in this evaluation.
if sgIdx > thisReply.Index {
thisReply.Index = sgIdx
}
*reply = thisReply

View File

@ -0,0 +1,38 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !consulent
package consul
import (
"errors"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
// getArgsForSamenessGroupMembers returns the arguments for the sameness group members if SamenessGroup
// field is set in the ServiceSpecificRequest. It returns the index of the sameness group, the arguments
// for the sameness group members and an error if any.
// If SamenessGroup is not set, it returns:
// - the index 0
// - an array containing the original arguments
// - nil error
// If SamenessGroup is set on CE, it returns::
// - the index of 0
// - nil array
// - an error indicating that sameness groups are not supported in consul CE
// If SamenessGroup is set on ENT, it returns:
// - the index of the sameness group
// - an array containing the arguments for the sameness group members
// - nil error
func (h *Health) getArgsForSamenessGroupMembers(args *structs.ServiceSpecificRequest,
ws memdb.WatchSet, state *state.Store) (uint64, []*structs.ServiceSpecificRequest, error) {
if args.SamenessGroup != "" {
return 0, nil, errors.New("sameness groups are not supported in consul CE")
}
return 0, []*structs.ServiceSpecificRequest{args}, nil
}

View File

@ -5,7 +5,6 @@ package discovery
import (
"context"
"errors"
"fmt"
"net"
"strings"
@ -469,7 +468,7 @@ func (f *V1DataFetcher) buildResultsFromServiceNodes(nodes []structs.CheckServic
Namespace: n.Service.NamespaceOrEmpty(),
Partition: n.Service.PartitionOrEmpty(),
Datacenter: n.Node.Datacenter,
PeerName: req.Tenancy.Peer,
PeerName: n.Service.PeerName,
},
})
}
@ -542,23 +541,10 @@ RPC:
return &out, nil
}
// fetchService is used to look up a service in the Consul catalog.
func (f *V1DataFetcher) fetchService(ctx Context, req *QueryPayload,
cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) {
f.logger.Trace("fetchService", "req", req)
if req.Tenancy.SamenessGroup == "" {
return f.fetchServiceBasedOnTenancy(ctx, req, cfg, lookupType)
}
return f.fetchServiceFromSamenessGroup(ctx, req, cfg, lookupType)
}
// fetchServiceBasedOnTenancy is used to look up a service in the Consul catalog based on its tenancy or default tenancy.
func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayload,
cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) {
f.logger.Trace(fmt.Sprintf("fetchServiceBasedOnTenancy - req: %+v", req))
if req.Tenancy.SamenessGroup != "" {
return nil, errors.New("sameness groups are not allowed for service lookups based on tenancy")
}
f.logger.Trace(fmt.Sprintf("fetchService - req: %+v", req))
// If no datacenter is passed, default to our own
datacenter := cfg.Datacenter
@ -573,14 +559,13 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa
if req.Tag != "" {
serviceTags = []string{req.Tag}
}
healthFilterType := structs.HealthFilterExcludeCritical
if cfg.OnlyPassing {
healthFilterType = structs.HealthFilterIncludeOnlyPassing
}
args := structs.ServiceSpecificRequest{
PeerName: req.Tenancy.Peer,
SamenessGroup: req.Tenancy.SamenessGroup,
Connect: lookupType == LookupTypeConnect,
Ingress: lookupType == LookupTypeIngress,
Datacenter: datacenter,
@ -611,11 +596,6 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa
return nil, ErrNotFound
}
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNotFound
}
// Perform a random shuffle
out.Nodes.Shuffle()
return f.buildResultsFromServiceNodes(out.Nodes, req, nil), nil

View File

@ -6,9 +6,6 @@
package discovery
import (
"errors"
"fmt"
"github.com/hashicorp/consul/acl"
)
@ -27,12 +24,3 @@ func validateEnterpriseTenancy(req QueryTenancy) error {
func queryTenancyToEntMeta(_ QueryTenancy) acl.EnterpriseMeta {
return acl.EnterpriseMeta{}
}
// fetchServiceFromSamenessGroup fetches a service from a sameness group.
func (f *V1DataFetcher) fetchServiceFromSamenessGroup(ctx Context, req *QueryPayload, cfg *V1DataFetcherDynamicConfig, lookupType LookupType) ([]*Result, error) {
f.logger.Trace(fmt.Sprintf("fetchServiceFromSamenessGroup - req: %+v", req))
if req.Tenancy.SamenessGroup == "" {
return nil, errors.New("sameness groups must be provided for service lookups")
}
return f.fetchServiceBasedOnTenancy(ctx, req, cfg, lookupType)
}

View File

@ -182,8 +182,9 @@ func Test_FetchEndpoints(t *testing.T) {
Node: "node-name",
},
Service: &structs.NodeService{
Address: "service-address",
Service: "service-name",
Address: "service-address",
Service: "service-name",
PeerName: "test-peer",
},
},
},

View File

@ -92,6 +92,7 @@ type serviceLookup struct {
PeerName string
Datacenter string
Service string
SamenessGroup string
Tag string
MaxRecursionLevel int
Connect bool
@ -439,18 +440,11 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
// server side to avoid transferring the entire node list.
if err := d.agent.RPC(context.Background(), "Catalog.ListNodes", &args, &out); err == nil {
for _, n := range out.Nodes {
lookup := serviceLookup{
// Peering PTR lookups are currently not supported, so we don't
// need to populate that field for creating the node FQDN.
// PeerName: n.PeerName,
Datacenter: n.Datacenter,
EnterpriseMeta: *n.GetEnterpriseMeta(),
}
arpa, _ := dns.ReverseAddr(n.Address)
if arpa == qName {
ptr := &dns.PTR{
Hdr: dns.RR_Header{Name: q.Name, Rrtype: dns.TypePTR, Class: dns.ClassINET, Ttl: 0},
Ptr: nodeCanonicalDNSName(lookup, n.Node, d.domain),
Ptr: nodeCanonicalDNSName(n, d.domain),
}
m.Answer = append(m.Answer, ptr)
break
@ -738,6 +732,10 @@ type queryLocality struct {
// not be shared between datacenters. In all other cases, it should be considered a DC.
peerOrDatacenter string
// samenessGroup is the samenessGroup name parsed from a label that has explicit parts.
// Example query: <service>.service.<sameness group>.sg.consul
samenessGroup string
acl.EnterpriseMeta
}
@ -805,59 +803,56 @@ func (d *DNSServer) dispatch(remoteAddr net.Addr, req, resp *dns.Msg, maxRecursi
return invalid()
}
localities, err := d.parseSamenessGroupLocality(cfg, querySuffixes, invalid)
locality, err := d.parseSamenessGroupLocality(cfg, querySuffixes, invalid)
if err != nil {
return err
}
// Loop over the localities and return as soon as a lookup is successful
for _, locality := range localities {
d.logger.Debug("labels", "querySuffixes", querySuffixes)
lookup := serviceLookup{
Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter),
PeerName: locality.peer,
SamenessGroup: locality.samenessGroup,
Connect: false,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: locality.EnterpriseMeta,
}
lookup := serviceLookup{
Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter),
PeerName: locality.peer,
Connect: false,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: locality.EnterpriseMeta,
}
// Only one of dc or peer can be used.
if lookup.PeerName != "" {
lookup.Datacenter = ""
// Only one of dc or peer can be used.
if lookup.PeerName != "" {
lookup.Datacenter = ""
}
// Support RFC 2782 style syntax
if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") {
// Grab the tag since we make nuke it if it's tcp
tag := queryParts[1][1:]
// Treat _name._tcp.service.consul as a default, no need to filter on that tag
if tag == "tcp" {
tag = ""
}
// Support RFC 2782 style syntax
if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") {
// Grab the tag since we make nuke it if it's tcp
tag := queryParts[1][1:]
// Treat _name._tcp.service.consul as a default, no need to filter on that tag
if tag == "tcp" {
tag = ""
}
lookup.Tag = tag
lookup.Service = queryParts[0][1:]
// _name._tag.service.consul
} else {
// Consul 0.3 and prior format for SRV queries
// Support "." in the label, re-join all the parts
tag := ""
if n >= 2 {
tag = strings.Join(queryParts[:n-1], ".")
}
lookup.Tag = tag
lookup.Service = queryParts[n-1]
// tag[.tag].name.service.consul
lookup.Tag = tag
lookup.Service = queryParts[0][1:]
// _name._tag.service.consul
} else {
// Consul 0.3 and prior format for SRV queries
// Support "." in the label, re-join all the parts
tag := ""
if n >= 2 {
tag = strings.Join(queryParts[:n-1], ".")
}
err = d.handleServiceQuery(cfg, lookup, req, resp)
// Return if we are error free right away, otherwise loop again if we can
if err == nil {
return nil
}
lookup.Tag = tag
lookup.Service = queryParts[n-1]
// tag[.tag].name.service.consul
}
err = d.handleServiceQuery(cfg, lookup, req, resp)
// Return if we are error free right away, otherwise loop again if we can
if err == nil {
return nil
}
// We've exhausted all DNS possibilities so return here
@ -1456,6 +1451,7 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st
}
args := structs.ServiceSpecificRequest{
PeerName: lookup.PeerName,
SamenessGroup: lookup.SamenessGroup,
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
@ -1758,20 +1754,20 @@ func findWeight(node structs.CheckServiceNode) int {
}
}
func (d *DNSServer) encodeIPAsFqdn(questionName string, lookup serviceLookup, ip net.IP) string {
func (d *DNSServer) encodeIPAsFqdn(questionName string, serviceNode structs.CheckServiceNode, ip net.IP) string {
ipv4 := ip.To4()
respDomain := d.getResponseDomain(questionName)
ipStr := hex.EncodeToString(ip)
if ipv4 != nil {
ipStr = ipStr[len(ipStr)-(net.IPv4len*2):]
}
if lookup.PeerName != "" {
if serviceNode.Service.PeerName != "" {
// Exclude the datacenter from the FQDN on the addr for peers.
// This technically makes no difference, since the addr endpoint ignores the DC
// component of the request, but do it anyway for a less confusing experience.
return fmt.Sprintf("%s.addr.%s", ipStr, respDomain)
}
return fmt.Sprintf("%s.addr.%s.%s", ipStr, lookup.Datacenter, respDomain)
return fmt.Sprintf("%s.addr.%s.%s", ipStr, serviceNode.Node.Datacenter, respDomain)
}
// Craft dns records for a an A record for an IP address
@ -1860,7 +1856,7 @@ func (d *DNSServer) makeRecordFromServiceNode(lookup serviceLookup, serviceNode
if q.Qtype == dns.TypeSRV {
respDomain := d.getResponseDomain(q.Name)
nodeFQDN := nodeCanonicalDNSName(lookup, serviceNode.Node.Node, respDomain)
nodeFQDN := nodeCanonicalDNSName(serviceNode.Node, respDomain)
answers := []dns.RR{
&dns.SRV{
Hdr: dns.RR_Header{
@ -1895,7 +1891,7 @@ func (d *DNSServer) makeRecordFromIP(lookup serviceLookup, addr net.IP, serviceN
}
if q.Qtype == dns.TypeSRV {
ipFQDN := d.encodeIPAsFqdn(q.Name, lookup, addr)
ipFQDN := d.encodeIPAsFqdn(q.Name, serviceNode, addr)
answers := []dns.RR{
&dns.SRV{
Hdr: dns.RR_Header{
@ -2076,7 +2072,7 @@ func (d *DNSServer) addServiceSRVRecordsToMessage(cfg *dnsConfig, lookup service
resp.Extra = append(resp.Extra, extra...)
if cfg.NodeMetaTXT {
resp.Extra = append(resp.Extra, d.makeTXTRecordFromNodeMeta(nodeCanonicalDNSName(lookup, node.Node.Node, respDomain), node.Node, ttl)...)
resp.Extra = append(resp.Extra, d.makeTXTRecordFromNodeMeta(nodeCanonicalDNSName(node.Node, respDomain), node.Node, ttl)...)
}
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
)
// NOTE: these functions have also been copied to agent/dns package for dns v2.
@ -63,27 +64,27 @@ func (d *DNSServer) parseLocality(labels []string, cfg *dnsConfig) (queryLocalit
type querySameness struct{}
// parseSamenessGroupLocality wraps parseLocality in CE
func (d *DNSServer) parseSamenessGroupLocality(cfg *dnsConfig, labels []string, errfnc func() error) ([]queryLocality, error) {
func (d *DNSServer) parseSamenessGroupLocality(cfg *dnsConfig, labels []string, errfnc func() error) (queryLocality, error) {
locality, ok := d.parseLocality(labels, cfg)
if !ok {
return nil, errfnc()
return queryLocality{}, errfnc()
}
return []queryLocality{locality}, nil
return locality, nil
}
func serviceCanonicalDNSName(name, kind, datacenter, domain string, _ *acl.EnterpriseMeta) string {
return fmt.Sprintf("%s.%s.%s.%s", name, kind, datacenter, domain)
}
func nodeCanonicalDNSName(lookup serviceLookup, nodeName, respDomain string) string {
if lookup.PeerName != "" {
func nodeCanonicalDNSName(node *structs.Node, respDomain string) string {
if node.PeerName != "" {
// We must return a more-specific DNS name for peering so
// that there is no ambiguity with lookups.
return fmt.Sprintf("%s.node.%s.peer.%s",
nodeName,
lookup.PeerName,
node.Node,
node.PeerName,
respDomain)
}
// Return a simpler format for non-peering nodes.
return fmt.Sprintf("%s.node.%s.%s", nodeName, lookup.Datacenter, respDomain)
return fmt.Sprintf("%s.node.%s.%s", node.Node, node.Datacenter, respDomain)
}

View File

@ -188,6 +188,10 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
}
s.parsePeerName(req, &args)
s.parseSamenessGroup(req, &args)
if args.SamenessGroup != "" && args.PeerName != "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "peer-name and sameness-group are mutually exclusive"}
}
// Check for tags
params := req.URL.Query()
@ -214,7 +218,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
prefix = "/v1/health/service/"
}
// Parse out the service name from the query params
// Parse the service name from the query params
args.ServiceName = strings.TrimPrefix(req.URL.Path, prefix)
if args.ServiceName == "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"}

View File

@ -0,0 +1,30 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !consulent
package agent
import (
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"testing"
)
func TestHealthServiceNodes_SamenessGroup_ErrorsOnCE(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&sameness-group=foo", nil)
resp := httptest.NewRecorder()
_, err := a.srv.HealthServiceNodes(resp, req)
require.ErrorContains(t, err, "sameness groups are not supported in consul CE")
}

View File

@ -739,7 +739,7 @@ func decodeBody(body io.Reader, out interface{}) error {
return lib.DecodeJSON(body, out)
}
// decodeBodyDeprecated is deprecated, please ues decodeBody above.
// decodeBodyDeprecated is deprecated, please use decodeBody above.
// decodeBodyDeprecated is used to decode a JSON request body
func decodeBodyDeprecated(req *http.Request, out interface{}, cb func(interface{}) error) error {
// This generally only happens in tests since real HTTP requests set
@ -1208,6 +1208,15 @@ func (s *HTTPHandlers) parsePeerName(req *http.Request, args *structs.ServiceSpe
}
}
func (s *HTTPHandlers) parseSamenessGroup(req *http.Request, args *structs.ServiceSpecificRequest) {
if sg := req.URL.Query().Get("sg"); sg != "" {
args.SamenessGroup = sg
}
if sg := req.URL.Query().Get("sameness-group"); sg != "" {
args.SamenessGroup = sg
}
}
// parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for
// filtering results to nodes with the given metadata key/value
func (s *HTTPHandlers) parseMetaFilter(req *http.Request) map[string]string {

View File

@ -105,7 +105,12 @@ func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
// Streaming is incompatible with NearestN queries (due to lack of ordering),
// so we can only use it if the NearestN would never work (Node == "")
// or if we explicitly say to ignore the Node field for queries (agentless xDS).
(req.Source.Node == "" || req.Source.DisableNode)
(req.Source.Node == "" || req.Source.DisableNode) &&
// Streaming is incompatible with SamenessGroup queries at the moment because
// the subscribe functionality maps to queries based on the service name and tenancy information
// it does not support the ability to subscribe to the same service in different partitions or peers
// and materialize the results into a single view with the first healthy sameness group member.
req.SamenessGroup == ""
}
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {

View File

@ -98,6 +98,17 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
},
expected: useRPC,
},
{
name: "rpc if sameness group",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
SamenessGroup: "sg1",
MergeCentralConfig: false,
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
},
expected: useRPC,
},
}
for _, tc := range testCases {
@ -246,6 +257,15 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
},
expected: useCache,
},
{
name: "use cache for sameness group request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
SamenessGroup: "test-group",
},
expected: useCache,
},
}
for _, tc := range testCases {

View File

@ -52,6 +52,7 @@ deep-copy \
-type ServiceRoute \
-type ServiceRouteDestination \
-type ServiceRouteMatch \
-type ServiceSpecificRequest \
-type TCPRouteConfigEntry \
-type Upstream \
-type UpstreamConfiguration \

View File

@ -1,4 +1,4 @@
// generated by deep-copy -pointer-receiver -o ./structs.deepcopy.go -type APIGatewayListener -type BoundAPIGatewayListener -type CARoot -type CheckServiceNode -type CheckType -type CompiledDiscoveryChain -type ConnectProxyConfig -type DiscoveryFailover -type DiscoveryGraphNode -type DiscoveryResolver -type DiscoveryRoute -type DiscoverySplit -type ExposeConfig -type ExportedServicesConfigEntry -type FileSystemCertificateConfigEntry -type GatewayService -type GatewayServiceTLSConfig -type HTTPHeaderModifiers -type HTTPRouteConfigEntry -type HashPolicy -type HealthCheck -type IndexedCARoots -type IngressListener -type InlineCertificateConfigEntry -type Intention -type IntentionPermission -type LoadBalancer -type MeshConfigEntry -type MeshDirectionalTLSConfig -type MeshTLSConfig -type Node -type NodeService -type PeeringServiceMeta -type ServiceConfigEntry -type ServiceConfigResponse -type ServiceConnect -type ServiceDefinition -type ServiceResolverConfigEntry -type ServiceResolverFailover -type ServiceRoute -type ServiceRouteDestination -type ServiceRouteMatch -type TCPRouteConfigEntry -type Upstream -type UpstreamConfiguration -type Status -type BoundAPIGatewayConfigEntry ./; DO NOT EDIT.
// generated by deep-copy -pointer-receiver -o ./structs.deepcopy.go -type APIGatewayListener -type BoundAPIGatewayListener -type CARoot -type CheckServiceNode -type CheckType -type CompiledDiscoveryChain -type ConnectProxyConfig -type DiscoveryFailover -type DiscoveryGraphNode -type DiscoveryResolver -type DiscoveryRoute -type DiscoverySplit -type ExposeConfig -type ExportedServicesConfigEntry -type FileSystemCertificateConfigEntry -type GatewayService -type GatewayServiceTLSConfig -type HTTPHeaderModifiers -type HTTPRouteConfigEntry -type HashPolicy -type HealthCheck -type IndexedCARoots -type IngressListener -type InlineCertificateConfigEntry -type Intention -type IntentionPermission -type LoadBalancer -type MeshConfigEntry -type MeshDirectionalTLSConfig -type MeshTLSConfig -type Node -type NodeService -type PeeringServiceMeta -type ServiceConfigEntry -type ServiceConfigResponse -type ServiceConnect -type ServiceDefinition -type ServiceResolverConfigEntry -type ServiceResolverFailover -type ServiceRoute -type ServiceRouteDestination -type ServiceRouteMatch -type ServiceSpecificRequest -type TCPRouteConfigEntry -type Upstream -type UpstreamConfiguration -type Status -type BoundAPIGatewayConfigEntry ./; DO NOT EDIT.
package structs
@ -1197,6 +1197,22 @@ func (o *ServiceRouteMatch) DeepCopy() *ServiceRouteMatch {
return &cp
}
// DeepCopy generates a deep copy of *ServiceSpecificRequest
func (o *ServiceSpecificRequest) DeepCopy() *ServiceSpecificRequest {
var cp ServiceSpecificRequest = *o
if o.NodeMetaFilters != nil {
cp.NodeMetaFilters = make(map[string]string, len(o.NodeMetaFilters))
for k2, v2 := range o.NodeMetaFilters {
cp.NodeMetaFilters[k2] = v2
}
}
if o.ServiceTags != nil {
cp.ServiceTags = make([]string, len(o.ServiceTags))
copy(cp.ServiceTags, o.ServiceTags)
}
return &cp
}
// DeepCopy generates a deep copy of *TCPRouteConfigEntry
func (o *TCPRouteConfigEntry) DeepCopy() *TCPRouteConfigEntry {
var cp TCPRouteConfigEntry = *o

View File

@ -753,6 +753,9 @@ type ServiceSpecificRequest struct {
// The name of the peer that the requested service was imported from.
PeerName string
// The name of the sameness group that should be the target of the query.
SamenessGroup string
NodeMetaFilters map[string]string
ServiceName string
ServiceKind ServiceKind
@ -821,6 +824,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
r.Filter,
r.EnterpriseMeta,
r.PeerName,
r.SamenessGroup,
r.Ingress,
r.ServiceKind,
r.MergeCentralConfig,

View File

@ -117,6 +117,13 @@ type QueryOptions struct {
// Note: Partitions are available only in Consul Enterprise
Partition string
// SamenessGroup is used find the SamenessGroup in the given
// Partition and will find the failover order for the Service
// from the SamenessGroup Members, with the given Partition being
// the first member.
// Note: SamenessGroups are available only in Consul Enterprise
SamenessGroup string
// Providing a datacenter overwrites the DC provided
// by the Config
Datacenter string
@ -847,6 +854,12 @@ func (r *request) setQueryOptions(q *QueryOptions) {
// rather than the alternative short-hand "ap"
r.params.Set("partition", q.Partition)
}
if q.SamenessGroup != "" {
// For backwards-compatibility with existing tests,
// use the long-hand query param name "sameness-group"
// rather than the alternative short-hand "sg"
r.params.Set("sameness-group", q.SamenessGroup)
}
if q.Datacenter != "" {
// For backwards-compatibility with existing tests,
// use the short-hand query param name "dc"