Add sameness group field to prepared queries (#17089)

* added method for converting SamenessGroupConfigEntry
- added new method `ToQueryFailoverTargets` for converting a SamenessGroupConfigEntry's members to a list of QueryFailoverTargets
- renamed `ToFailoverTargets` ToServiceResolverFailoverTargets to distinguish it from `ToQueryFailoverTargets`

* Added SamenessGroup to PreparedQuery
- exposed Service.Partition to API when defining a prepared query
- added a method for determining if a QueryFailoverOptions is empty
- This will be useful for validation
- added unit tests

* added method for retrieving a SamenessGroup to state store

* added logic for using PQ with SamenessGroup
- added branching path for SamenessGroup handling in execute. It will be handled separate from the normal PQ case
- added a new interface so that the `GetSamenessGroupFailoverTargets` can be properly tested
- separated the execute logic into a `targetSelector` function so that it can be used for both failover and sameness group PQs
- split OSS only methods into new PQ OSS files
- added validation that `samenessGroup` is an enterprise only feature

* added documentation for PQ SamenessGroup
This commit is contained in:
Michael Wilkerson 2023-04-24 13:21:28 -07:00 committed by GitHub
parent 9ce50aefbb
commit 001d540afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 421 additions and 135 deletions

View File

@ -1196,7 +1196,7 @@ func (c *compiler) makeSamenessGroupFailover(target *structs.DiscoveryTarget, op
}
var failoverTargets []*structs.DiscoveryTarget
for _, t := range samenessGroup.ToFailoverTargets() {
for _, t := range samenessGroup.ToServiceResolverFailoverTargets() {
// Rewrite the target as per the failover policy.
targetOpts := structs.MergeDiscoveryTargetOpts(opts, t.ToDiscoveryTargetOpts())
failoverTarget := c.rewriteTarget(target, targetOpts)

View File

@ -46,6 +46,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Tags[1]:tag2",
".Tags[2]:tag3",
".Peer:",
".SamenessGroup:",
}
expected = append(expected, entMetaWalkFields...)
sort.Strings(expected)

View File

@ -141,7 +141,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
}
// parseQuery makes sure the entries of a query are valid for a create or
// update operation. Some of the fields are not checked or are partially
// update operation. Some fields are not checked or are partially
// checked, as noted in the comments below. This also updates all the parsed
// fields of the query.
func parseQuery(query *structs.PreparedQuery) error {
@ -205,6 +205,10 @@ func parseService(svc *structs.ServiceQuery) error {
return err
}
if err := parseSameness(svc); err != nil {
return err
}
// We skip a few fields:
// - There's no validation for Datacenters; we skip any unknown entries
// at execution time.
@ -371,6 +375,14 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
return structs.ErrQueryNotFound
}
// If we have a sameness group, it controls the initial query and
// subsequent failover if required (Enterprise Only)
if query.Service.SamenessGroup != "" {
wrapper := newQueryServerWrapper(p.srv, p.ExecuteRemote)
if err := querySameness(wrapper, *query, args, reply); err != nil {
return err
}
} else {
// Execute the query for the local DC.
if err := p.execute(query, reply, args.Connect); err != nil {
return err
@ -431,7 +443,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
}
// Either a source IP was given but we couldnt find the associated node
// Either a source IP was given, but we couldn't find the associated node
// or no source ip was given. In both cases we should wipe the Node value
if qs.Node == "_ip" {
qs.Node = ""
@ -470,11 +482,12 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
// by the query setup.
if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
wrapper := newQueryServerWrapper(p.srv, p.ExecuteRemote)
if err := queryFailover(wrapper, *query, args, reply); err != nil {
return err
}
}
}
return nil
}
@ -660,20 +673,38 @@ func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNode
return filtered
}
type stateLookuper interface {
samenessGroupLookup(name string, entMeta acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error)
}
type stateLookup struct {
srv *Server
}
// queryServer is a wrapper that makes it easier to test the failover logic.
type queryServer interface {
GetLogger() hclog.Logger
GetOtherDatacentersByDistance() ([]string, error)
GetLocalDC() string
ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
GetSamenessGroupFailoverTargets(name string, entMeta acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error)
}
// queryServerWrapper applies the queryServer interface to a Server.
type queryServerWrapper struct {
srv *Server
sl stateLookuper
executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
}
func newQueryServerWrapper(srv *Server, executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error) *queryServerWrapper {
return &queryServerWrapper{
srv: srv,
executeRemote: executeRemote,
sl: stateLookup{srv},
}
}
// GetLocalDC returns the name of the local datacenter.
func (q *queryServerWrapper) GetLocalDC() string {
return q.srv.config.Datacenter
@ -771,6 +802,29 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
// This keeps track of how many iterations we actually run.
failovers++
err = targetSelector(q, query, args, target, reply)
if err != nil {
continue
}
// We can stop if we found some nodes.
if len(reply.Nodes) > 0 {
break
}
}
// Set this at the end because the response from the remote doesn't have
// this information.
reply.Failovers = failovers
return nil
}
func targetSelector(q queryServer,
query structs.PreparedQuery,
args *structs.PreparedQueryExecuteRequest,
target structs.QueryFailoverTarget,
reply *structs.PreparedQueryExecuteResponse) error {
// Be super paranoid and set the nodes slice to nil since it's
// the same slice we used before. We know there's nothing in
// there, but the underlying msgpack library has a policy of
@ -800,6 +854,7 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
Connect: args.Connect,
}
var err error
if err = q.ExecuteRemote(remote, reply); err != nil {
q.GetLogger().Warn("Failed querying for service in datacenter",
"service", query.Service.Service,
@ -808,18 +863,8 @@ func queryFailover(q queryServer, query structs.PreparedQuery,
"enterpriseMeta", query.Service.EnterpriseMeta,
"error", err,
)
continue
return err
}
// We can stop if we found some nodes.
if len(reply.Nodes) > 0 {
break
}
}
// Set this at the end because the response from the remote doesn't have
// this information.
reply.Failovers = failovers
return nil
}

View File

@ -0,0 +1,38 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:build !consulent
// +build !consulent
package consul
import (
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
)
func parseSameness(svc *structs.ServiceQuery) error {
if svc.SamenessGroup != "" {
return fmt.Errorf("sameness-groups are an enterprise-only feature")
}
return nil
}
func (sl stateLookup) samenessGroupLookup(_ string, _ acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error) {
return 0, nil, nil
}
// GetSamenessGroupFailoverTargets supports Sameness Groups an enterprise only feature. This satisfies the queryServer interface
func (q *queryServerWrapper) GetSamenessGroupFailoverTargets(_ string, _ acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error) {
return []structs.QueryFailoverTarget{}, nil
}
func querySameness(_ queryServer,
_ structs.PreparedQuery,
_ *structs.PreparedQueryExecuteRequest,
_ *structs.PreparedQueryExecuteResponse) error {
return nil
}

View File

@ -0,0 +1,52 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:build !consulent
// +build !consulent
package consul
import (
"os"
"testing"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPreparedQuery_OSS_Apply(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Set up a bare bones query.
query := structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "test",
Service: structs.ServiceQuery{
Service: "redis",
},
},
}
var reply string
// Fix that and ensure Targets and Datacenters cannot be set at the same time.
query.Query.Service.SamenessGroup = "sg"
err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
require.Error(t, err)
assert.Contains(t, err.Error(), "enterprise")
}

View File

@ -6,6 +6,7 @@ package consul
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"reflect"
@ -37,6 +38,8 @@ import (
"github.com/hashicorp/consul/types"
)
const localTestDC = "dc1"
func TestPreparedQuery_Apply(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -2814,13 +2817,17 @@ func TestPreparedQuery_Wrapper(t *testing.T) {
})
}
var _ queryServer = (*mockQueryServer)(nil)
type mockQueryServer struct {
queryServerWrapper
Datacenters []string
DatacentersError error
QueryLog []string
QueryFn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
Logger hclog.Logger
LogBuffer *bytes.Buffer
SamenessGroup map[string]*structs.SamenessGroupConfigEntry
}
func (m *mockQueryServer) JoinQueryLog() string {
@ -2841,7 +2848,7 @@ func (m *mockQueryServer) GetLogger() hclog.Logger {
}
func (m *mockQueryServer) GetLocalDC() string {
return "dc1"
return localTestDC
}
func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
@ -2850,14 +2857,21 @@ func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
peerName := args.Query.Service.Peer
partitionName := args.Query.Service.PartitionOrEmpty()
namespaceName := args.Query.Service.NamespaceOrEmpty()
dc := args.Datacenter
if peerName != "" {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("peer:%s", peerName))
} else if partitionName != "" {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("partition:%s", partitionName))
} else if namespaceName != "" {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("namespace:%s", namespaceName))
} else {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, "PreparedQuery.ExecuteRemote"))
}
reply.PeerName = peerName
reply.Datacenter = dc
reply.EnterpriseMeta = acl.NewEnterpriseMetaWithPartition(partitionName, namespaceName)
if m.QueryFn != nil {
return m.QueryFn(args, reply)
@ -2865,6 +2879,33 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote
return nil
}
type mockStateLookup struct {
SamenessGroup map[string]*structs.SamenessGroupConfigEntry
}
func (sl mockStateLookup) samenessGroupLookup(name string, entMeta acl.EnterpriseMeta) (uint64, *structs.SamenessGroupConfigEntry, error) {
lookup := name
if ap := entMeta.PartitionOrEmpty(); ap != "" {
lookup = fmt.Sprintf("%s-%s", lookup, ap)
} else if ns := entMeta.NamespaceOrEmpty(); ns != "" {
lookup = fmt.Sprintf("%s-%s", lookup, ns)
}
sg, ok := sl.SamenessGroup[lookup]
if !ok {
return 0, nil, errors.New("unable to find sameness group")
}
return 0, sg, nil
}
func (m *mockQueryServer) GetSamenessGroupFailoverTargets(name string, entMeta acl.EnterpriseMeta) ([]structs.QueryFailoverTarget, error) {
m.sl = mockStateLookup{
SamenessGroup: m.SamenessGroup,
}
return m.queryServerWrapper.GetSamenessGroupFailoverTargets(name, entMeta)
}
func TestPreparedQuery_queryFailover(t *testing.T) {
t.Parallel()
query := structs.PreparedQuery{

View File

@ -0,0 +1,19 @@
package state
import (
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
// GetSamenessGroup returns a SamenessGroupConfigEntry from the state
// store using the provided parameters.
func (s *Store) GetSamenessGroup(ws memdb.WatchSet,
name string,
overrides map[configentry.KindName]structs.ConfigEntry,
partition string) (uint64, *structs.SamenessGroupConfigEntry, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return getSamenessGroupConfigEntryTxn(tx, ws, name, overrides, partition)
}

View File

@ -8,20 +8,27 @@ package structs
import "fmt"
// Validate assures that the sameness-groups are an enterprise only feature
func (s *SamenessGroupConfigEntry) Validate() error {
return fmt.Errorf("sameness-groups are an enterprise-only feature")
}
// RelatedPeers returns all peers that are members of a sameness group config entry.
// RelatedPeers is an OSS placeholder noop
func (s *SamenessGroupConfigEntry) RelatedPeers() []string {
return nil
}
// AllMembers adds the local partition to Members when it is set.
// AllMembers is an OSS placeholder noop
func (s *SamenessGroupConfigEntry) AllMembers() []SamenessGroupMember {
return nil
}
func (s *SamenessGroupConfigEntry) ToFailoverTargets() []ServiceResolverFailoverTarget {
// ToServiceResolverFailoverTargets is an OSS placeholder noop
func (s *SamenessGroupConfigEntry) ToServiceResolverFailoverTargets() []ServiceResolverFailoverTarget {
return nil
}
// ToQueryFailoverTargets is an OSS placeholder noop
func (s *SamenessGroupConfigEntry) ToQueryFailoverTargets(namespace string) []QueryFailoverTarget {
return nil
}

View File

@ -44,6 +44,14 @@ func (f *QueryFailoverOptions) AsTargets() []QueryFailoverTarget {
return f.Targets
}
// IsEmpty returns true if the QueryFailoverOptions are empty (not set), false otherwise
func (f *QueryFailoverOptions) IsEmpty() bool {
if f == nil || (f.NearestN == 0 && len(f.Datacenters) == 0 && len(f.Targets) == 0) {
return true
}
return false
}
type QueryFailoverTarget struct {
// Peer specifies a peer to try during failover.
Peer string
@ -66,6 +74,11 @@ type ServiceQuery struct {
// Service is the service to query.
Service string
// SamenessGroup specifies a sameness group to query. The first member of the Sameness Group will
// be targeted first on PQ execution and subsequent members will be targeted during failover scenarios.
// This field is mutually exclusive with Failover.
SamenessGroup string
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryFailoverOptions

View File

@ -5,6 +5,8 @@ package structs
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestStructs_PreparedQuery_GetACLPrefix(t *testing.T) {
@ -36,3 +38,54 @@ func TestPreparedQueryExecuteRequest_CacheInfoKey(t *testing.T) {
ignored := []string{"Agent", "QueryOptions"}
assertCacheInfoKeyIsComplete(t, &PreparedQueryExecuteRequest{}, ignored...)
}
func TestQueryFailoverOptions_IsEmpty(t *testing.T) {
tests := []struct {
name string
query QueryFailoverOptions
isExpectedEmpty bool
}{
{
name: "expect empty",
query: QueryFailoverOptions{},
isExpectedEmpty: true,
},
{
name: "expect not empty NearestN",
query: QueryFailoverOptions{
NearestN: 1,
},
isExpectedEmpty: false,
},
{
name: "expect not empty NearestN negative",
query: QueryFailoverOptions{
NearestN: -1,
},
isExpectedEmpty: false,
},
{
name: "expect not empty datacenters",
query: QueryFailoverOptions{
Datacenters: []string{"dc"},
},
isExpectedEmpty: false,
},
{
name: "expect not empty targets",
query: QueryFailoverOptions{
Targets: []QueryFailoverTarget{
{
Peer: "peer",
},
},
},
isExpectedEmpty: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.isExpectedEmpty, tt.query.IsEmpty())
})
}
}

View File

@ -51,9 +51,17 @@ type ServiceQuery struct {
// Service is the service to query.
Service string
// SamenessGroup specifies a sameness group to query. The first member of the Sameness Group will
// be targeted first on PQ execution and subsequent members will be targeted during failover scenarios.
// This field is mutually exclusive with Failover.
SamenessGroup string `json:",omitempty"`
// Namespace of the service to query
Namespace string `json:",omitempty"`
// Partition of the service to query
Partition string `json:",omitempty"`
// Near allows baking in the name of a node to automatically distance-
// sort from. The magic "_agent" value is supported, which sorts near
// the agent which initiated the request by default.
@ -61,7 +69,7 @@ type ServiceQuery struct {
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryFailoverOptions
Failover QueryFailoverOptions `json:",omitempty"`
// IgnoreCheckIDs is an optional list of health check IDs to ignore when
// considering which nodes are healthy. It is useful as an emergency measure

View File

@ -177,13 +177,22 @@ The table below shows this endpoint's support for
- `Service` `(string: <required>)` - Specifies the name of the service to
query.
- `Namespace` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul namespace
to query. If not provided the query will use Consul default namespace for resolution.
- `SamenessGroup` `(string: "")` <EnterpriseAlert inline /> - Specifies a Sameness group to forward the
query to. The `SamenessGroup` will forward to its members in the order defined, returning on the first
healthy query. `SamenessGroup` is mutually exclusive with `Failover` as `SamenessGroup` members will be used
in place of a defined list of failovers.
- `Failover` contains two fields, both of which are optional, and determine
what happens if no healthy nodes are available in the local datacenter when
- `Namespace` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul namespace
to query. If not provided the query will use Consul default namespace for resolution. When combined with
`SamenessGroup` this will specify the namespaces in which the `SamenessGroup` will resolve all members listed.
- `Partition` `(string: "")` <EnterpriseAlert inline /> - Specifies the Consul partition
to query. If not provided the query will use Consul's default partition for resolution. When combined with
`SamenessGroup`, this will specify the partition where the `SamenessGroup` exists.
- `Failover` Determines what happens if no healthy nodes are available in the local datacenter when
the query is executed. It allows the use of nodes in other datacenters with
very little configuration.
very little configuration. This field is mutually exclusive with `SamenessGroup`.
- `NearestN` `(int: 0)` - Specifies that the query will be forwarded to up
to `NearestN` other datacenters based on their estimated network round