mirror of https://github.com/status-im/consul.git
Adds test for remote datacenter selection and query logic.
This commit is contained in:
parent
bbc5185000
commit
d06e2a535d
|
@ -305,7 +305,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|||
// by the query setup.
|
||||
if len(reply.Nodes) == 0 {
|
||||
wrapper := &queryServerWrapper{p.srv}
|
||||
if err := queryFailover(wrapper, query, args, reply); err != nil {
|
||||
if err := queryFailover(wrapper, query, args.Limit, args.QueryOptions, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -488,7 +488,7 @@ func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, repl
|
|||
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
||||
// them to try to locate alternative services.
|
||||
func queryFailover(q queryServer, query *structs.PreparedQuery,
|
||||
args *structs.PreparedQueryExecuteRequest,
|
||||
limit int, options structs.QueryOptions,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
|
||||
// Pull the list of other DCs. This is sorted by RTT in case the user
|
||||
|
@ -543,10 +543,10 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
|
|||
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
||||
Datacenter: dc,
|
||||
Query: *query,
|
||||
Limit: args.Limit,
|
||||
QueryOptions: args.QueryOptions,
|
||||
Limit: limit,
|
||||
QueryOptions: options,
|
||||
}
|
||||
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, &remote, &reply); err != nil {
|
||||
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
|
||||
q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -1800,3 +1802,380 @@ func TestPreparedQuery_Wrapper(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
type mockQueryServer struct {
|
||||
Datacenters []string
|
||||
DatacentersError error
|
||||
QueryLog []string
|
||||
QueryFn func(dc string, args interface{}, reply interface{}) error
|
||||
Logger *log.Logger
|
||||
LogBuffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) JoinQueryLog() string {
|
||||
return strings.Join(m.QueryLog, "|")
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) GetLogger() *log.Logger {
|
||||
if m.Logger == nil {
|
||||
m.LogBuffer = new(bytes.Buffer)
|
||||
m.Logger = log.New(m.LogBuffer, "", 0)
|
||||
}
|
||||
return m.Logger
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) {
|
||||
return m.Datacenters, m.DatacentersError
|
||||
}
|
||||
|
||||
func (m *mockQueryServer) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
|
||||
m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, method))
|
||||
if ret, ok := reply.(*structs.PreparedQueryExecuteResponse); ok {
|
||||
ret.Datacenter = dc
|
||||
}
|
||||
if m.QueryFn != nil {
|
||||
return m.QueryFn(dc, args, reply)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreparedQuery_queryFailover(t *testing.T) {
|
||||
query := &structs.PreparedQuery{
|
||||
Service: structs.ServiceQuery{
|
||||
Failover: structs.QueryDatacenterOptions{
|
||||
NearestN: 0,
|
||||
Datacenters: []string{""},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodes := func() structs.CheckServiceNodes {
|
||||
return structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node1"},
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node2"},
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node3"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Datacenters are available but the query doesn't use them.
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
// Make it fail to get datacenters.
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
DatacentersError: fmt.Errorf("XXX"),
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply)
|
||||
if err == nil || !strings.Contains(err.Error(), "XXX") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
// The query wants to use other datacenters but none are available.
|
||||
query.Service.Failover.NearestN = 3
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
}
|
||||
|
||||
// Try the first three nearest datacenters, first one has the data.
|
||||
query.Service.Failover.NearestN = 3
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc1" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc1" || reply.Failovers != 1 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Try the first three nearest datacenters, last one has the data.
|
||||
query.Service.Failover.NearestN = 3
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc3" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc3" || reply.Failovers != 3 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Try the first four nearest datacenters, nobody has the data.
|
||||
query.Service.Failover.NearestN = 4
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 0 ||
|
||||
reply.Datacenter != "xxx" || reply.Failovers != 4 {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Try the first two nearest datacenters, plus a user-specified one that
|
||||
// has the data.
|
||||
query.Service.Failover.NearestN = 2
|
||||
query.Service.Failover.Datacenters = []string{"dc4"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc4" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc4" || reply.Failovers != 3 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Add in a hard-coded value that overlaps with the nearest list.
|
||||
query.Service.Failover.NearestN = 2
|
||||
query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc4" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc4" || reply.Failovers != 3 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Now add a bogus user-defined one to the mix.
|
||||
query.Service.Failover.NearestN = 2
|
||||
query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc4" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc4" || reply.Failovers != 3 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
if !strings.Contains(mock.LogBuffer.String(), "Skipping unknown datacenter") {
|
||||
t.Fatalf("bad: %s", mock.LogBuffer.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Same setup as before but dc1 is going to return an error and should
|
||||
// get skipped over, still yielding data from dc4 which comes later.
|
||||
query.Service.Failover.NearestN = 2
|
||||
query.Service.Failover.Datacenters = []string{"dc4", "dc1"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "dc1" {
|
||||
return fmt.Errorf("XXX")
|
||||
} else if dc == "dc4" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "dc4" || reply.Failovers != 3 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
if !strings.Contains(mock.LogBuffer.String(), "Failed querying") {
|
||||
t.Fatalf("bad: %s", mock.LogBuffer.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Just use a hard-coded list and now xxx has the data.
|
||||
query.Service.Failover.NearestN = 0
|
||||
query.Service.Failover.Datacenters = []string{"dc3", "xxx"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "xxx" {
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "xxx" || reply.Failovers != 2 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the limit and query options are plumbed through.
|
||||
query.Service.Failover.NearestN = 0
|
||||
query.Service.Failover.Datacenters = []string{"xxx"}
|
||||
{
|
||||
mock := &mockQueryServer{
|
||||
Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"},
|
||||
QueryFn: func(dc string, args interface{}, reply interface{}) error {
|
||||
inp := args.(*structs.PreparedQueryExecuteRemoteRequest)
|
||||
ret := reply.(*structs.PreparedQueryExecuteResponse)
|
||||
if dc == "xxx" {
|
||||
if inp.Limit != 5 {
|
||||
t.Fatalf("bad: %d", inp.Limit)
|
||||
}
|
||||
if inp.RequireConsistent != true {
|
||||
t.Fatalf("bad: %v", inp.RequireConsistent)
|
||||
}
|
||||
ret.Nodes = nodes()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := queryFailover(mock, query, 5, structs.QueryOptions{RequireConsistent: true}, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(reply.Nodes) != 3 ||
|
||||
reply.Datacenter != "xxx" || reply.Failovers != 1 ||
|
||||
!reflect.DeepEqual(reply.Nodes, nodes()) {
|
||||
t.Fatalf("bad: %v", reply)
|
||||
}
|
||||
if queries := mock.JoinQueryLog(); queries != "xxx:PreparedQuery.ExecuteRemote" {
|
||||
t.Fatalf("bad: %s", queries)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue