Skips unknown DCs during queries and chugs along in the face of errors.

This commit is contained in:
James Phillips 2015-11-09 21:13:53 -08:00
parent 2f34b51650
commit 666619dfc9
1 changed files with 37 additions and 12 deletions

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
"strings" "strings"
"time" "time"
@ -366,6 +367,7 @@ func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServ
// queryServer is a wrapper that makes it easier to test the failover logic. // queryServer is a wrapper that makes it easier to test the failover logic.
type queryServer interface { type queryServer interface {
GetLogger() *log.Logger
GetOtherDatacentersByDistance() ([]string, error) GetOtherDatacentersByDistance() ([]string, error)
ForwardDC(method, dc string, args interface{}, reply interface{}) error ForwardDC(method, dc string, args interface{}, reply interface{}) error
} }
@ -375,9 +377,9 @@ type queryServerWrapper struct {
srv *Server srv *Server
} }
// ForwardDC calls into the server's RPC forwarder. // GetLogger returns the server's logger.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error { func (q *queryServerWrapper) GetLogger() *log.Logger {
return q.srv.forwardDC(method, dc, args, reply) return q.srv.logger
} }
// GetOtherDatacentersByDistance calls into the server's fn and filters out the // GetOtherDatacentersByDistance calls into the server's fn and filters out the
@ -397,21 +399,35 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
return result, nil return result, nil
} }
// ForwardDC calls into the server's RPC forwarder.
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
return q.srv.forwardDC(method, dc, args, reply)
}
// queryFailover runs an algorithm to determine which DCs to try and then calls // queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services. // them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery, func queryFailover(q queryServer, query *structs.PreparedQuery,
args *structs.PreparedQueryExecuteRequest, args *structs.PreparedQueryExecuteRequest,
reply *structs.PreparedQueryExecuteResponse) error { reply *structs.PreparedQueryExecuteResponse) error {
// Build a candidate list of DCs, starting with the nearest N from RTTs. // Pull the list of other DCs. This is sorted by RTT in case the user
// has selected that.
nearest, err := q.GetOtherDatacentersByDistance()
if err != nil {
return err
}
// This will help us filter unknown DCs supplied by the user.
known := make(map[string]struct{})
for _, dc := range nearest {
known[dc] = struct{}{}
}
// Build a candidate list of DCs to try, starting with the nearest N
// from RTTs.
var dcs []string var dcs []string
index := make(map[string]struct{}) index := make(map[string]struct{})
if query.Service.Failover.NearestN > 0 { if query.Service.Failover.NearestN > 0 {
nearest, err := q.GetOtherDatacentersByDistance()
if err != nil {
return err
}
for i, dc := range nearest { for i, dc := range nearest {
if !(i < query.Service.Failover.NearestN) { if !(i < query.Service.Failover.NearestN) {
break break
@ -424,8 +440,16 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// Then add any DCs explicitly listed that weren't selected above. // Then add any DCs explicitly listed that weren't selected above.
for _, dc := range query.Service.Failover.Datacenters { for _, dc := range query.Service.Failover.Datacenters {
_, ok := index[dc] // This will prevent a log of other log spammage if we do not
if !ok { // attempt to talk to datacenters we don't know about.
if _, ok := known[dc]; !ok {
q.GetLogger().Printf("[DEBUG] consul.prepared_query: Skipping unknown datacenter '%s' in prepared query", dc)
continue
}
// This will make sure we don't re-try something that fails
// from the NearestN list.
if _, ok := index[dc]; !ok {
dcs = append(dcs, dc) dcs = append(dcs, dc)
} }
} }
@ -442,7 +466,8 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
QueryOptions: args.QueryOptions, QueryOptions: args.QueryOptions,
} }
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil { if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
return err q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
continue
} }
// We can stop if we found some nodes. // We can stop if we found some nodes.