Add Cluster Peering Failover Support to Prepared Queries (#13835)

Add peering failover support to prepared queries
This commit is contained in:
Eric Haberkorn 2022-07-22 09:14:43 -04:00 committed by GitHub
parent f47319b7c6
commit 501089292e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 369 additions and 101 deletions

View File

@ -22,7 +22,7 @@ var (
},
Service: structs.ServiceQuery{
Service: "${name.full}",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
Datacenters: []string{
"${name.full}",
"${name.prefix}",
@ -69,7 +69,7 @@ var (
},
Service: structs.ServiceQuery{
Service: "${name.full}",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
Datacenters: []string{
"dc1",
"dc2",

View File

@ -20,7 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
service := &structs.ServiceQuery{
Service: "the-service",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
Datacenters: []string{"dc1", "dc2"},
},
Near: "_agent",

View File

@ -187,11 +187,16 @@ func parseService(svc *structs.ServiceQuery) error {
return fmt.Errorf("Must provide a Service name to query")
}
failover := svc.Failover
// NearestN can be 0 which means "don't fail over by RTT".
if svc.Failover.NearestN < 0 {
if failover.NearestN < 0 {
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
}
if (failover.NearestN != 0 || len(failover.Datacenters) != 0) && len(failover.Targets) != 0 {
return fmt.Errorf("Targets cannot be populated with NearestN or Datacenters")
}
// Make sure the metadata filters are valid
if err := structs.ValidateNodeMetadata(svc.NodeMeta, true); err != nil {
return err
@ -462,7 +467,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
// by the query setup.
if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{p.srv}
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
if err := queryFailover(wrapper, query, args, reply); err != nil {
return err
}
@ -565,8 +570,13 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
reply.Nodes = nodes
reply.DNS = query.DNS
// Stamp the result for this datacenter.
reply.Datacenter = p.srv.config.Datacenter
// Stamp the result with its this datacenter or peer.
if peerName := query.Service.PeerName; peerName != "" {
reply.PeerName = peerName
reply.Datacenter = ""
} else {
reply.Datacenter = p.srv.config.Datacenter
}
return nil
}
@ -651,12 +661,24 @@ func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNode
type queryServer interface {
GetLogger() hclog.Logger
GetOtherDatacentersByDistance() ([]string, error)
ForwardDC(method, dc string, args interface{}, reply interface{}) error
GetLocalDC() string
ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
}
// queryServerWrapper applies the queryServer interface to a Server.
type queryServerWrapper struct {
srv *Server
srv *Server
executeRemote func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
}
// GetLocalDC returns the name of the local datacenter.
func (q *queryServerWrapper) GetLocalDC() string {
return q.srv.config.Datacenter
}
// ExecuteRemote calls ExecuteRemote on PreparedQuery.
func (q *queryServerWrapper) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
return q.executeRemote(args, reply)
}
// GetLogger returns the server's logger.
@ -683,11 +705,6 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
return result, nil
}
// ForwardDC calls into the server's RPC forwarder.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
return q.srv.forwardDC(method, dc, args, reply)
}
// queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery,
@ -709,7 +726,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// Build a candidate list of DCs to try, starting with the nearest N
// from RTTs.
var dcs []string
var targets []structs.QueryFailoverTarget
index := make(map[string]struct{})
if query.Service.Failover.NearestN > 0 {
for i, dc := range nearest {
@ -717,30 +734,36 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
break
}
dcs = append(dcs, dc)
targets = append(targets, structs.QueryFailoverTarget{Datacenter: dc})
index[dc] = struct{}{}
}
}
// Then add any DCs explicitly listed that weren't selected above.
for _, dc := range query.Service.Failover.Datacenters {
for _, target := range query.Service.Failover.AsTargets() {
// This will prevent a log of other log spammage if we do not
// attempt to talk to datacenters we don't know about.
if _, ok := known[dc]; !ok {
q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc)
continue
if dc := target.Datacenter; dc != "" {
if _, ok := known[dc]; !ok {
q.GetLogger().Debug("Skipping unknown datacenter in prepared query", "datacenter", dc)
continue
}
// This will make sure we don't re-try something that fails
// from the NearestN list.
if _, ok := index[dc]; !ok {
targets = append(targets, target)
}
}
// This will make sure we don't re-try something that fails
// from the NearestN list.
if _, ok := index[dc]; !ok {
dcs = append(dcs, dc)
if target.PeerName != "" {
targets = append(targets, target)
}
}
// Now try the selected DCs in priority order.
failovers := 0
for _, dc := range dcs {
for _, target := range targets {
// This keeps track of how many iterations we actually run.
failovers++
@ -752,7 +775,15 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// through this slice across successive RPC calls.
reply.Nodes = nil
// Note that we pass along the limit since it can be applied
// Reset PeerName because it may have been set by a previous failover
// target.
query.Service.PeerName = target.PeerName
dc := target.Datacenter
if target.PeerName != "" {
dc = q.GetLocalDC()
}
// Note that we pass along the limit since may be applied
// remotely to save bandwidth. We also pass along the consistency
// mode information and token we were given, so that applies to
// the remote query as well.
@ -763,9 +794,11 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
QueryOptions: args.QueryOptions,
Connect: args.Connect,
}
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
if err = q.ExecuteRemote(remote, reply); err != nil {
q.GetLogger().Warn("Failed querying for service in datacenter",
"service", query.Service.Service,
"peerName", query.Service.PeerName,
"datacenter", dc,
"error", err,
)

View File

@ -2,6 +2,9 @@ package consul
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"reflect"
@ -14,6 +17,7 @@ import (
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"
@ -23,6 +27,7 @@ import (
"github.com/hashicorp/consul/agent/structs/aclfilter"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
@ -82,8 +87,25 @@ func TestPreparedQuery_Apply(t *testing.T) {
t.Fatalf("bad: %v", err)
}
// Fix that and make sure it propagates an error from the Raft apply.
// Fix that and ensure Targets and NearestN cannot be set at the same time.
query.Query.Service.Failover.NearestN = 1
query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}}
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") {
t.Fatalf("bad: %v", err)
}
// Fix that and ensure Targets and Datacenters cannot be set at the same time.
query.Query.Service.Failover.NearestN = 0
query.Query.Service.Failover.Datacenters = []string{"dc2"}
query.Query.Service.Failover.Targets = []structs.QueryFailoverTarget{{PeerName: "peer"}}
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "Targets cannot be populated with") {
t.Fatalf("bad: %v", err)
}
// Fix that and make sure it propagates an error from the Raft apply.
query.Query.Service.Failover.Targets = nil
query.Query.Session = "nope"
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply)
if err == nil || !strings.Contains(err.Error(), "invalid session") {
@ -1442,6 +1464,17 @@ func TestPreparedQuery_Execute(t *testing.T) {
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc3"
c.PrimaryDatacenter = "dc3"
c.NodeName = "acceptingServer.dc3"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
waitForLeaderEstablishment(t, s3)
codec3 := rpcClient(t, s3)
defer codec3.Close()
// Try to WAN join.
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
@ -1456,6 +1489,70 @@ func TestPreparedQuery_Execute(t *testing.T) {
// check for RPC forwarding
testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
testrpc.WaitForLeader(t, s1.RPC, "dc2", testrpc.WithToken("root"))
testrpc.WaitForLeader(t, s3.RPC, "dc3")
acceptingPeerName := "my-peer-accepting-server"
dialingPeerName := "my-peer-dialing-server"
// Set up peering between dc1 (dailing) and dc3 (accepting) and export the foo service
{
// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: dialingPeerName,
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
p := &pbpeering.Peering{
ID: "cc56f0b8-3885-4e78-8d7b-614a0c45712d",
Name: acceptingPeerName,
PeerID: token.PeerID,
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p.ShouldDial())
require.NoError(t, s1.fsm.State().PeeringWrite(1000, p))
// Wait for the stream to be connected.
retry.Run(t, func(r *retry.R) {
status, found := s1.peerStreamServer.StreamStatus(p.ID)
require.True(r, found)
require.True(r, status.Connected)
})
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc3",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{{PeerName: dialingPeerName}},
},
},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(codec3, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}
execNoNodesToken := createTokenWithPolicyName(t, codec1, "no-nodes", `service_prefix "foo" { policy = "read" }`, "root")
rules := `
@ -1485,9 +1582,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Set up some nodes in each DC that host the service.
{
for i := 0; i < 10; i++ {
for _, dc := range []string{"dc1", "dc2"} {
for _, d := range []struct {
codec rpc.ClientCodec
dc string
}{
{codec1, "dc1"},
{codec2, "dc2"},
{codec3, "dc3"},
} {
req := structs.RegisterRequest{
Datacenter: dc,
Datacenter: d.dc,
Node: fmt.Sprintf("node%d", i+1),
Address: fmt.Sprintf("127.0.0.%d", i+1),
NodeMeta: map[string]string{
@ -1497,7 +1601,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)},
Tags: []string{d.dc, fmt.Sprintf("tag%d", i+1)},
Meta: map[string]string{
"svc-group": fmt.Sprintf("%d", i%2),
"foo": "true",
@ -1510,15 +1614,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
req.Service.Meta["unique"] = "true"
}
var codec rpc.ClientCodec
if dc == "dc1" {
codec = codec1
} else {
codec = codec2
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(d.codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -1576,6 +1673,17 @@ func TestPreparedQuery_Execute(t *testing.T) {
assert.True(t, reply.QueryMeta.KnownLeader)
}
expectFailoverPeerNodes := func(t *testing.T, query *structs.PreparedQueryRequest, reply *structs.PreparedQueryExecuteResponse, n int) {
t.Helper()
assert.Len(t, reply.Nodes, n)
assert.Equal(t, "", reply.Datacenter)
assert.Equal(t, acceptingPeerName, reply.PeerName)
assert.Equal(t, 2, reply.Failovers)
assert.Equal(t, query.Query.Service.Service, reply.Service)
assert.Equal(t, query.Query.DNS, reply.DNS)
assert.True(t, reply.QueryMeta.KnownLeader)
}
t.Run("run the registered query", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
@ -1962,10 +2070,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
// Update the health of a node to mark it critical.
setHealth := func(t *testing.T, node string, health string) {
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) {
t.Helper()
req := structs.RegisterRequest{
Datacenter: "dc1",
Datacenter: dc,
Node: node,
Address: "127.0.0.1",
Service: &structs.NodeService{
@ -1981,9 +2089,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var reply struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "Catalog.Register", &req, &reply))
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
}
setHealth(t, "node1", api.HealthCritical)
setHealth(t, codec1, "dc1", "node1", api.HealthCritical)
// The failing node should be filtered.
t.Run("failing node filtered", func(t *testing.T) {
@ -2003,7 +2111,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
})
// Upgrade it to a warning and re-query, should be 10 nodes again.
setHealth(t, "node1", api.HealthWarning)
setHealth(t, codec1, "dc1", "node1", api.HealthWarning)
t.Run("warning nodes are included", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
@ -2173,7 +2281,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Now fail everything in dc1 and we should get an empty list back.
for i := 0; i < 10; i++ {
setHealth(t, fmt.Sprintf("node%d", i+1), api.HealthCritical)
setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical)
}
t.Run("everything is failing so should get empty list", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
@ -2308,6 +2416,61 @@ func TestPreparedQuery_Execute(t *testing.T) {
assert.NotEqual(t, "node3", node.Node.Node)
}
})
// Modify the query to have it fail over to a bogus DC and then dc2.
query.Query.Service.Failover = structs.QueryFailoverOptions{
Targets: []structs.QueryFailoverTarget{
{Datacenter: "dc2"},
{PeerName: acceptingPeerName},
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
// Ensure the foo service has fully replicated.
retry.Run(t, func(r *retry.R) {
_, nodes, err := s1.fsm.State().CheckServiceNodes(nil, "foo", nil, acceptingPeerName)
require.NoError(r, err)
require.Len(r, nodes, 10)
})
// Now we should see 9 nodes from dc2
t.Run("failing over to cluster peers", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}
expectFailoverNodes(t, &query, &reply, 9)
})
// Set all checks in dc2 as critical
for i := 0; i < 10; i++ {
setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical)
}
// Now we should see 9 nodes from dc3 (we have the tag filter still)
t.Run("failing over to cluster peers", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}
expectFailoverPeerNodes(t, &query, &reply, 9)
})
}
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
@ -2724,7 +2887,9 @@ func TestPreparedQuery_Wrapper(t *testing.T) {
joinWAN(t, s2, s1)
// Try all the operations on a real server via the wrapper.
wrapper := &queryServerWrapper{s1}
wrapper := &queryServerWrapper{srv: s1, executeRemote: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
return nil
}}
wrapper.GetLogger().Debug("Test")
ret, err := wrapper.GetOtherDatacentersByDistance()
@ -2746,7 +2911,7 @@ type mockQueryServer struct {
Datacenters []string
DatacentersError error
QueryLog []string
QueryFn func(dc string, args interface{}, reply interface{}) error
QueryFn func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error
Logger hclog.Logger
LogBuffer *bytes.Buffer
}
@ -2768,17 +2933,27 @@ func (m *mockQueryServer) GetLogger() hclog.Logger {
return m.Logger
}
func (m *mockQueryServer) GetLocalDC() string {
return "dc1"
}
func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
return m.Datacenters, m.DatacentersError
}
func (m *mockQueryServer) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, method))
if ret, ok := reply.(*structs.PreparedQueryExecuteResponse); ok {
ret.Datacenter = dc
func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
peerName := args.Query.Service.PeerName
dc := args.Datacenter
if peerName != "" {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("peer:%s", peerName))
} else {
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, "PreparedQuery.ExecuteRemote"))
}
reply.PeerName = peerName
reply.Datacenter = dc
if m.QueryFn != nil {
return m.QueryFn(dc, args, reply)
return m.QueryFn(args, reply)
}
return nil
}
@ -2788,7 +2963,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
query := &structs.PreparedQuery{
Name: "test",
Service: structs.ServiceQuery{
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
NearestN: 0,
Datacenters: []string{""},
},
@ -2862,10 +3037,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc1" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc1" {
reply.Nodes = nodes()
}
return nil
},
@ -2890,10 +3064,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc3" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc3" {
reply.Nodes = nodes()
}
return nil
},
@ -2926,7 +3099,7 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
}
if len(reply.Nodes) != 0 ||
reply.Datacenter != "xxx" || reply.Failovers != 4 {
t.Fatalf("bad: %v", reply)
t.Fatalf("bad: %+v", reply)
}
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
t.Fatalf("bad: %s", queries)
@ -2940,10 +3113,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc4" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc4" {
reply.Nodes = nodes()
}
return nil
},
@ -2969,10 +3141,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc4" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc4" {
reply.Nodes = nodes()
}
return nil
},
@ -2998,10 +3169,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc4" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc4" {
reply.Nodes = nodes()
}
return nil
},
@ -3029,12 +3199,11 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "dc1" {
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "dc1" {
return fmt.Errorf("XXX")
} else if dc == "dc4" {
ret.Nodes = nodes()
} else if req.Datacenter == "dc4" {
reply.Nodes = nodes()
}
return nil
},
@ -3063,10 +3232,9 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, _ interface{}, reply interface{}) error {
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "xxx" {
ret.Nodes = nodes()
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "xxx" {
reply.Nodes = nodes()
}
return nil
},
@ -3092,17 +3260,15 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
{
mock := &mockQueryServer{
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
QueryFn: func(dc string, args interface{}, reply interface{}) error {
inp := args.(*structs.PreparedQueryExecuteRemoteRequest)
ret := reply.(*structs.PreparedQueryExecuteResponse)
if dc == "xxx" {
if inp.Limit != 5 {
t.Fatalf("bad: %d", inp.Limit)
QueryFn: func(req *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if req.Datacenter == "xxx" {
if req.Limit != 5 {
t.Fatalf("bad: %d", req.Limit)
}
if inp.RequireConsistent != true {
t.Fatalf("bad: %v", inp.RequireConsistent)
if req.RequireConsistent != true {
t.Fatalf("bad: %v", req.RequireConsistent)
}
ret.Nodes = nodes()
reply.Nodes = nodes()
}
return nil
},
@ -3124,4 +3290,32 @@ func TestPreparedQuery_queryFailover(t *testing.T) {
t.Fatalf("bad: %s", queries)
}
}
// Failover returns data from the first cluster peer with data.
query.Service.Failover.Datacenters = nil
query.Service.Failover.Targets = []structs.QueryFailoverTarget{
{PeerName: "cluster-01"},
{Datacenter: "dc44"},
{PeerName: "cluster-02"},
}
{
mock := &mockQueryServer{
Datacenters: []string{"dc44"},
QueryFn: func(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error {
if args.Query.Service.PeerName == "cluster-02" {
reply.Nodes = nodes()
}
return nil
},
}
var reply structs.PreparedQueryExecuteResponse
if err := queryFailover(mock, query, &structs.PreparedQueryExecuteRequest{}, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, "cluster-02", reply.PeerName)
require.Equal(t, 3, reply.Failovers)
require.Equal(t, nodes(), reply.Nodes)
require.Equal(t, "peer:cluster-01|dc44:PreparedQuery.ExecuteRemote|peer:cluster-02", mock.JoinQueryLog())
}
}

View File

@ -6075,7 +6075,7 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) {
Name: "my-query",
Service: structs.ServiceQuery{
Service: "db",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
Datacenters: []string{"dc2"},
},
},

View File

@ -92,7 +92,7 @@ func TestPreparedQuery_Create(t *testing.T) {
Session: "my-session",
Service: structs.ServiceQuery{
Service: "my-service",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
NearestN: 4,
Datacenters: []string{"dc1", "dc2"},
},
@ -883,7 +883,7 @@ func TestPreparedQuery_Update(t *testing.T) {
Session: "my-session",
Service: structs.ServiceQuery{
Service: "my-service",
Failover: structs.QueryDatacenterOptions{
Failover: structs.QueryFailoverOptions{
NearestN: 4,
Datacenters: []string{"dc1", "dc2"},
},

View File

@ -10,9 +10,9 @@ import (
"github.com/hashicorp/consul/types"
)
// QueryDatacenterOptions sets options about how we fail over if there are no
// QueryFailoverOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
type QueryFailoverOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
@ -21,6 +21,32 @@ type QueryDatacenterOptions struct {
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
// Targets is a fixed list of datacenters and peers to try. This field cannot
// be populated with NearestN or Datacenters.
Targets []QueryFailoverTarget
}
// AsTargets either returns Targets as is or Datacenters converted into
// Targets.
func (f *QueryFailoverOptions) AsTargets() []QueryFailoverTarget {
if dcs := f.Datacenters; len(dcs) > 0 {
var targets []QueryFailoverTarget
for _, dc := range dcs {
targets = append(targets, QueryFailoverTarget{Datacenter: dc})
}
return targets
}
return f.Targets
}
type QueryFailoverTarget struct {
// PeerName specifies a peer to try during failover.
PeerName string
// Datacenter specifies a datacenter to try during failover.
Datacenter string
}
// QueryDNSOptions controls settings when query results are served over DNS.
@ -37,7 +63,7 @@ type ServiceQuery struct {
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
Failover QueryFailoverOptions
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
@ -323,6 +349,9 @@ type PreparedQueryExecuteResponse struct {
// Datacenter is the datacenter that these results came from.
Datacenter string
// PeerName specifies the cluster peer that these results came from.
PeerName string
// Failovers is a count of how many times we had to query a remote
// datacenter.
Failovers int

View File

@ -1,8 +1,8 @@
package api
// QueryDatacenterOptions sets options about how we fail over if there are no
// QueryFailoverOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
type QueryFailoverOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
@ -11,6 +11,18 @@ type QueryDatacenterOptions struct {
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
// Targets is a fixed list of datacenters and peers to try. This field cannot
// be populated with NearestN or Datacenters.
Targets []QueryFailoverTarget
}
type QueryFailoverTarget struct {
// PeerName specifies a peer to try during failover.
PeerName string
// Datacenter specifies a datacenter to try during failover.
Datacenter string
}
// QueryDNSOptions controls settings when query results are served over DNS.
@ -35,7 +47,7 @@ type ServiceQuery struct {
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
Failover QueryFailoverOptions
// IgnoreCheckIDs is an optional list of health check IDs to ignore when
// considering which nodes are healthy. It is useful as an emergency measure