mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
Adds sort of DCs in catalog queries based on RTT. Cleans up.
* Makes the catalog endpoint respect disabling coordinates for all RTT-sorting query types.
This commit is contained in:
parent
89c7203f31
commit
36c78f5042
@ -107,6 +107,11 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
|
|||||||
|
|
||||||
// Sort the DCs
|
// Sort the DCs
|
||||||
sort.Strings(dcs)
|
sort.Strings(dcs)
|
||||||
|
if !c.srv.config.DisableCoordinates {
|
||||||
|
if err := c.srv.sortDatacentersByDistance(dcs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return
|
// Return
|
||||||
*reply = dcs
|
*reply = dcs
|
||||||
@ -132,7 +137,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.Nodes = index, nodes
|
reply.Index, reply.Nodes = index, nodes
|
||||||
return c.srv.sortByDistanceFrom(args.Source, reply.Nodes)
|
if c.srv.config.DisableCoordinates {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,7 +200,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||||||
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return c.srv.sortByDistanceFrom(args.Source, reply.ServiceNodes)
|
if c.srv.config.DisableCoordinates {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Provide some metrics
|
// Provide some metrics
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -234,9 +233,7 @@ func TestCatalogListDatacenters(t *testing.T) {
|
|||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort the dcs
|
// The DCs should come out sorted by default.
|
||||||
sort.Strings(out)
|
|
||||||
|
|
||||||
if len(out) != 2 {
|
if len(out) != 2 {
|
||||||
t.Fatalf("bad: %v", out)
|
t.Fatalf("bad: %v", out)
|
||||||
}
|
}
|
||||||
@ -248,6 +245,75 @@ func TestCatalogListDatacenters(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalogListDatacenters_DistanceSort(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerDC(t, "dc2")
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
|
dir3, s3 := testServerDC(t, "acdc")
|
||||||
|
defer os.RemoveAll(dir3)
|
||||||
|
defer s3.Shutdown()
|
||||||
|
|
||||||
|
// Try to join
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := s3.JoinWAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||||
|
|
||||||
|
var out []string
|
||||||
|
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It's super hard to force the Serfs into a known configuration of
|
||||||
|
// coordinates, so the best we can do is make sure that the sorting
|
||||||
|
// function is getting called (it's tested extensively in rtt_test.go).
|
||||||
|
// Since this is relative to dc1, it will be listed first (proving we
|
||||||
|
// went into the sort fn) and the other two will be sorted by name since
|
||||||
|
// there are no known coordinates for them.
|
||||||
|
if len(out) != 3 {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[0] != "dc1" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[1] != "acdc" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[2] != "dc2" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we get the natural order if coordinates are disabled.
|
||||||
|
s1.config.DisableCoordinates = true
|
||||||
|
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(out) != 3 {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[0] != "acdc" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[1] != "dc1" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out[2] != "dc2" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCatalogListNodes(t *testing.T) {
|
func TestCatalogListNodes(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
@ -543,6 +609,34 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) {
|
|||||||
if out.Nodes[4].Node != s1.config.NodeName {
|
if out.Nodes[4].Node != s1.config.NodeName {
|
||||||
t.Fatalf("bad: %v", out)
|
t.Fatalf("bad: %v", out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure we get the natural order if coordinates are disabled.
|
||||||
|
s1.config.DisableCoordinates = true
|
||||||
|
args = structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
|
||||||
|
}
|
||||||
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
|
client.Call("Catalog.ListNodes", &args, &out)
|
||||||
|
return len(out.Nodes) == 5, nil
|
||||||
|
}, func(err error) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
})
|
||||||
|
if out.Nodes[0].Node != "aaa" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.Nodes[1].Node != "bar" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.Nodes[2].Node != "baz" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.Nodes[3].Node != "foo" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.Nodes[4].Node != s1.config.NodeName {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkCatalogListNodes(t *testing.B) {
|
func BenchmarkCatalogListNodes(t *testing.B) {
|
||||||
@ -888,6 +982,32 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
|
|||||||
if out.ServiceNodes[3].Node != "aaa" {
|
if out.ServiceNodes[3].Node != "aaa" {
|
||||||
t.Fatalf("bad: %v", out)
|
t.Fatalf("bad: %v", out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure we get the natural order if coordinates are disabled.
|
||||||
|
s1.config.DisableCoordinates = true
|
||||||
|
args = structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "db",
|
||||||
|
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
|
||||||
|
}
|
||||||
|
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.ServiceNodes) != 4 {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.ServiceNodes[0].Node != "aaa" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.ServiceNodes[1].Node != "foo" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.ServiceNodes[2].Node != "bar" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
if out.ServiceNodes[3].Node != "baz" {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCatalogNodeServices(t *testing.T) {
|
func TestCatalogNodeServices(t *testing.T) {
|
||||||
|
@ -296,6 +296,10 @@ func DefaultConfig() *Config {
|
|||||||
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
|
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
|
||||||
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
|
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
|
||||||
|
|
||||||
|
// Cache coordinates for the WAN since the number of servers is small,
|
||||||
|
// and because we don't store these in the database.
|
||||||
|
conf.SerfWANConfig.CacheCoordinates = true
|
||||||
|
|
||||||
// Disable shutdown on removal
|
// Disable shutdown on removal
|
||||||
conf.RaftConfig.ShutdownOnRemove = false
|
conf.RaftConfig.ShutdownOnRemove = false
|
||||||
|
|
||||||
|
136
consul/rtt.go
136
consul/rtt.go
@ -110,9 +110,11 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sortByDistanceFrom is used to sort results from our service catalog based on the
|
// sortNodesByDistanceFrom is used to sort results from our service catalog based
|
||||||
// distance (RTT) from the given source node.
|
// on the round trip time from the given source node. Nodes with missing coordinates
|
||||||
func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}) error {
|
// will get stable sorted at the end of the list.
|
||||||
|
|
||||||
|
func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error {
|
||||||
// We can't compare coordinates across DCs.
|
// We can't compare coordinates across DCs.
|
||||||
if source.Datacenter != s.config.Datacenter {
|
if source.Datacenter != s.config.Datacenter {
|
||||||
return nil
|
return nil
|
||||||
@ -137,3 +139,131 @@ func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}
|
|||||||
sort.Stable(sorter)
|
sort.Stable(sorter)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serfer provides the coordinate information we need from the Server in an
|
||||||
|
// interface that's easy to mock out for testing. Without this, we'd have to
|
||||||
|
// do some really painful setup to get good unit test coverage of all the cases.
|
||||||
|
type serfer interface {
|
||||||
|
GetDatacenter() string
|
||||||
|
GetCoordinate() (*coordinate.Coordinate, error)
|
||||||
|
GetCachedCoordinate(node string) (*coordinate.Coordinate, bool)
|
||||||
|
GetNodesForDatacenter(dc string) []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// serverSerfer wraps a Server with the serfer interface.
|
||||||
|
type serverSerfer struct {
|
||||||
|
server *Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *serverSerfer) GetDatacenter() string {
|
||||||
|
return s.server.config.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||||
|
return s.server.serfWAN.GetCoordinate()
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
|
||||||
|
return s.server.serfWAN.GetCachedCoordinate(node)
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
|
||||||
|
nodes := make([]string, 0)
|
||||||
|
for _, part := range s.server.remoteConsuls[dc] {
|
||||||
|
nodes = append(nodes, part.Name)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// sortDatacentersByDistance will sort the given list of DCs based on the
|
||||||
|
// median RTT to all nodes we know about from the WAN gossip pool). DCs with
|
||||||
|
// missing coordinates will be stable sorted to the end of the list.
|
||||||
|
func (s *Server) sortDatacentersByDistance(dcs []string) error {
|
||||||
|
serfer := serverSerfer{s}
|
||||||
|
return sortDatacentersByDistance(&serfer, dcs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getDatacenterDistance will return the median round trip time estimate for
|
||||||
|
// the given DC from the given serfer, in seconds. This will return positive
|
||||||
|
// infinity if no coordinates are available.
|
||||||
|
func getDatacenterDistance(s serfer, dc string) (float64, error) {
|
||||||
|
// If this is the serfer's DC then just bail with zero RTT.
|
||||||
|
if dc == s.GetDatacenter() {
|
||||||
|
return 0.0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise measure from the serfer to the nodes in the other DC.
|
||||||
|
coord, err := s.GetCoordinate()
|
||||||
|
if err != nil {
|
||||||
|
return 0.0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch all the nodes in the DC.
|
||||||
|
nodes := s.GetNodesForDatacenter(dc)
|
||||||
|
subvec := make([]float64, len(nodes))
|
||||||
|
for j, node := range nodes {
|
||||||
|
if other, ok := s.GetCachedCoordinate(node); ok {
|
||||||
|
subvec[j] = computeDistance(coord, other)
|
||||||
|
} else {
|
||||||
|
subvec[j] = computeDistance(coord, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the median by sorting and taking the middle item.
|
||||||
|
sort.Float64s(subvec)
|
||||||
|
fmt.Println("%v", subvec)
|
||||||
|
if len(subvec) > 0 {
|
||||||
|
return subvec[len(subvec)/2], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return computeDistance(coord, nil), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// datacenterSorter takes a list of DC names and a parallel vector of distances
|
||||||
|
// and implements sort.Interface, keeping both structures coherent and sorting
|
||||||
|
// by distance.
|
||||||
|
type datacenterSorter struct {
|
||||||
|
Names []string
|
||||||
|
Vec []float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// See sort.Interface.
|
||||||
|
func (n *datacenterSorter) Len() int {
|
||||||
|
return len(n.Names)
|
||||||
|
}
|
||||||
|
|
||||||
|
// See sort.Interface.
|
||||||
|
func (n *datacenterSorter) Swap(i, j int) {
|
||||||
|
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
|
||||||
|
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// See sort.Interface.
|
||||||
|
func (n *datacenterSorter) Less(i, j int) bool {
|
||||||
|
return n.Vec[i] < n.Vec[j]
|
||||||
|
}
|
||||||
|
|
||||||
|
// sortDatacentersByDistance will sort the given list of DCs based on the
|
||||||
|
// median RTT to all nodes the given serfer knows about from the WAN gossip
|
||||||
|
// pool). DCs with missing coordinates will be stable sorted to the end of the
|
||||||
|
// list.
|
||||||
|
func sortDatacentersByDistance(s serfer, dcs []string) error {
|
||||||
|
// Build up a list of median distances to the other DCs.
|
||||||
|
vec := make([]float64, len(dcs))
|
||||||
|
for i, dc := range dcs {
|
||||||
|
rtt, err := getDatacenterDistance(s, dc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
vec[i] = rtt
|
||||||
|
}
|
||||||
|
|
||||||
|
sorter := &datacenterSorter{dcs, vec}
|
||||||
|
sort.Stable(sorter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@ -96,7 +97,7 @@ func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) {
|
|||||||
time.Sleep(2 * server.config.CoordinateUpdatePeriod)
|
time.Sleep(2 * server.config.CoordinateUpdatePeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) {
|
||||||
dir, server := testServer(t)
|
dir, server := testServer(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer server.Shutdown()
|
defer server.Shutdown()
|
||||||
@ -117,7 +118,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
|
|
||||||
// The zero value for the source should not trigger any sorting.
|
// The zero value for the source should not trigger any sorting.
|
||||||
var source structs.QuerySource
|
var source structs.QuerySource
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -125,7 +126,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
// Same for a source in some other DC.
|
// Same for a source in some other DC.
|
||||||
source.Node = "node1"
|
source.Node = "node1"
|
||||||
source.Datacenter = "dc2"
|
source.Datacenter = "dc2"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -133,7 +134,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
// Same for a source node in our DC that we have no coordinate for.
|
// Same for a source node in our DC that we have no coordinate for.
|
||||||
source.Node = "apple"
|
source.Node = "apple"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -143,7 +144,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
// its lexical hegemony.
|
// its lexical hegemony.
|
||||||
source.Node = "node1"
|
source.Node = "node1"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
||||||
@ -153,7 +154,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
// they were in from the previous sort.
|
// they were in from the previous sort.
|
||||||
source.Node = "node2"
|
source.Node = "node2"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
||||||
@ -161,13 +162,13 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
|||||||
// Let's exercise the stable sort explicitly to make sure we didn't
|
// Let's exercise the stable sort explicitly to make sure we didn't
|
||||||
// just get lucky.
|
// just get lucky.
|
||||||
nodes[1], nodes[2] = nodes[2], nodes[1]
|
nodes[1], nodes[2] = nodes[2], nodes[1]
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
|
||||||
dir, server := testServer(t)
|
dir, server := testServer(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer server.Shutdown()
|
defer server.Shutdown()
|
||||||
@ -188,7 +189,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
|
|
||||||
// The zero value for the source should not trigger any sorting.
|
// The zero value for the source should not trigger any sorting.
|
||||||
var source structs.QuerySource
|
var source structs.QuerySource
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -196,7 +197,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
// Same for a source in some other DC.
|
// Same for a source in some other DC.
|
||||||
source.Node = "node1"
|
source.Node = "node1"
|
||||||
source.Datacenter = "dc2"
|
source.Datacenter = "dc2"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -204,7 +205,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
// Same for a source node in our DC that we have no coordinate for.
|
// Same for a source node in our DC that we have no coordinate for.
|
||||||
source.Node = "apple"
|
source.Node = "apple"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||||
@ -214,7 +215,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
// its lexical hegemony.
|
// its lexical hegemony.
|
||||||
source.Node = "node1"
|
source.Node = "node1"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
||||||
@ -224,7 +225,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
// they were in from the previous sort.
|
// they were in from the previous sort.
|
||||||
source.Node = "node2"
|
source.Node = "node2"
|
||||||
source.Datacenter = "dc1"
|
source.Datacenter = "dc1"
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
||||||
@ -232,8 +233,141 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
|||||||
// Let's exercise the stable sort explicitly to make sure we didn't
|
// Let's exercise the stable sort explicitly to make sure we didn't
|
||||||
// just get lucky.
|
// just get lucky.
|
||||||
nodes[1], nodes[2] = nodes[2], nodes[1]
|
nodes[1], nodes[2] = nodes[2], nodes[1]
|
||||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mockNodeMap is keyed by node name and the values are the coordinates of the
|
||||||
|
// node.
|
||||||
|
type mockNodeMap map[string]*coordinate.Coordinate
|
||||||
|
|
||||||
|
// mockServer is used to provide a serfer interface for unit tests. The key is
|
||||||
|
// DC, which selects a map from node name to coordinate for that node.
|
||||||
|
type mockServer map[string]mockNodeMap
|
||||||
|
|
||||||
|
// newMockServer is used to generate a serfer interface that presents a known DC
|
||||||
|
// topology for unit tests. The server is in dc0.
|
||||||
|
//
|
||||||
|
// Here's the layout of the nodes:
|
||||||
|
//
|
||||||
|
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
|
||||||
|
// node2 node1 node3 node1 node1
|
||||||
|
// | | | | | | | | | | |
|
||||||
|
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
|
||||||
|
//
|
||||||
|
// We also include a node4 in dc1 with no known coordinate, as well as a
|
||||||
|
// mysterious dcX with no nodes with known coordinates.
|
||||||
|
//
|
||||||
|
func newMockServer() *mockServer {
|
||||||
|
s := make(mockServer)
|
||||||
|
s["dc0"] = mockNodeMap{
|
||||||
|
"dc0.node1": generateCoordinate(10 * time.Millisecond),
|
||||||
|
}
|
||||||
|
s["dc1"] = mockNodeMap{
|
||||||
|
"dc1.node1": generateCoordinate(3 * time.Millisecond),
|
||||||
|
"dc1.node2": generateCoordinate(2 * time.Millisecond),
|
||||||
|
"dc1.node3": generateCoordinate(5 * time.Millisecond),
|
||||||
|
"dc1.node4": nil, // no known coordinate
|
||||||
|
}
|
||||||
|
s["dc2"] = mockNodeMap{
|
||||||
|
"dc2.node1": generateCoordinate(8 * time.Millisecond),
|
||||||
|
}
|
||||||
|
s["dcX"] = mockNodeMap{
|
||||||
|
"dcX.node1": nil, // no known coordinate
|
||||||
|
}
|
||||||
|
return &s
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *mockServer) GetDatacenter() string {
|
||||||
|
return "dc0"
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *mockServer) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||||
|
return (*s)["dc0"]["dc0.node1"], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *mockServer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
|
||||||
|
for _, nodes := range *s {
|
||||||
|
for n, coord := range nodes {
|
||||||
|
if n == node && coord != nil {
|
||||||
|
return coord, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// See serfer.
|
||||||
|
func (s *mockServer) GetNodesForDatacenter(dc string) []string {
|
||||||
|
nodes := make([]string, 0)
|
||||||
|
if n, ok := (*s)[dc]; ok {
|
||||||
|
for name := range n {
|
||||||
|
nodes = append(nodes, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRtt_getDatacenterDistance(t *testing.T) {
|
||||||
|
s := newMockServer()
|
||||||
|
|
||||||
|
// The serfer's own DC is always 0 ms away.
|
||||||
|
if dist, err := getDatacenterDistance(s, "dc0"); err != nil || dist != 0.0 {
|
||||||
|
t.Fatalf("bad: %v err: %v", dist, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check a DC with no coordinates, which should give positive infinity.
|
||||||
|
if dist, err := getDatacenterDistance(s, "dcX"); err != nil || dist != math.Inf(1.0) {
|
||||||
|
t.Fatalf("bad: %v err: %v", dist, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Similar for a totally unknown DC.
|
||||||
|
if dist, err := getDatacenterDistance(s, "acdc"); err != nil || dist != math.Inf(1.0) {
|
||||||
|
t.Fatalf("bad: %v err: %v", dist, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the trivial median case (just one node).
|
||||||
|
if dist, err := getDatacenterDistance(s, "dc2"); err != nil || dist != 0.002 {
|
||||||
|
t.Fatalf("bad: %v err: %v", dist, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the more interesting median case, note that there's a mystery
|
||||||
|
// node4 in there that should make the distances sort like this:
|
||||||
|
//
|
||||||
|
// [0] node3 (0.005), [1] node1 (0.007), [2] node2 (0.008), [3] node4 (+Inf)
|
||||||
|
//
|
||||||
|
// So the median should be at index 4 / 2 = 2 -> 0.008.
|
||||||
|
if dist, err := getDatacenterDistance(s, "dc1"); err != nil || dist != 0.008 {
|
||||||
|
t.Fatalf("bad: %v err: %v", dist, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRtt_sortDatacentersByDistance(t *testing.T) {
|
||||||
|
s := newMockServer()
|
||||||
|
|
||||||
|
dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"}
|
||||||
|
if err := sortDatacentersByDistance(s, dcs); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := "dc0,dc2,dc1,acdc,dcX"
|
||||||
|
if actual := strings.Join(dcs, ","); actual != expected {
|
||||||
|
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the sort is stable and we didn't just get lucky.
|
||||||
|
dcs = []string{"dcX", "dc0", "dc1", "dc2", "acdc"}
|
||||||
|
if err := sortDatacentersByDistance(s, dcs); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected = "dc0,dc2,dc1,dcX,acdc"
|
||||||
|
if actual := strings.Join(dcs, ","); actual != expected {
|
||||||
|
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -141,7 +141,7 @@ func isConsulServer(m serf.Member) (bool, *serverParts) {
|
|||||||
return true, parts
|
return true, parts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns if a member is a consul node. Returns a boo,
|
// Returns if a member is a consul node. Returns a bool,
|
||||||
// and the datacenter.
|
// and the datacenter.
|
||||||
func isConsulNode(m serf.Member) (bool, string) {
|
func isConsulNode(m serf.Member) (bool, string) {
|
||||||
if m.Tags["role"] != "node" {
|
if m.Tags["role"] != "node" {
|
||||||
|
@ -159,6 +159,10 @@ If the API call succeeds a 200 status code is returned.
|
|||||||
This endpoint is hit with a GET and is used to return all the
|
This endpoint is hit with a GET and is used to return all the
|
||||||
datacenters that are known by the Consul server.
|
datacenters that are known by the Consul server.
|
||||||
|
|
||||||
|
The datacenters will be sorted in ascending order based on the
|
||||||
|
estimated median round trip time from the server to the servers
|
||||||
|
in that datacenter.
|
||||||
|
|
||||||
It returns a JSON body like this:
|
It returns a JSON body like this:
|
||||||
|
|
||||||
```javascript
|
```javascript
|
||||||
|
Loading…
x
Reference in New Issue
Block a user