mirror of https://github.com/status-im/consul.git
Adds coordinate sorting support to catalog queries for nodes and service nodes.
This commit is contained in:
parent
d734697820
commit
89c7203f31
|
@ -60,6 +60,7 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
|
|||
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Setup the request
|
||||
args := structs.DCSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -90,6 +91,7 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
|
|||
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Set default DC
|
||||
args := structs.ServiceSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -476,12 +476,12 @@ func DefaultConfig() *Config {
|
|||
DNSConfig: DNSConfig{
|
||||
MaxStale: 5 * time.Second,
|
||||
},
|
||||
StatsitePrefix: "consul",
|
||||
SyslogFacility: "LOCAL0",
|
||||
Protocol: consul.ProtocolVersionMax,
|
||||
CheckUpdateInterval: 5 * time.Minute,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
StatsitePrefix: "consul",
|
||||
SyslogFacility: "LOCAL0",
|
||||
Protocol: consul.ProtocolVersionMax,
|
||||
CheckUpdateInterval: 5 * time.Minute,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
|
||||
// SyncCoordinateRateTarget is set based on the rate that we want
|
||||
// the server to handle as an aggregate across the entire cluster.
|
||||
|
@ -490,11 +490,11 @@ func DefaultConfig() *Config {
|
|||
SyncCoordinateRateTarget: 64.0, // updates / second
|
||||
SyncCoordinateIntervalMin: 15 * time.Second,
|
||||
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLDefaultPolicy: "allow",
|
||||
RetryInterval: 30 * time.Second,
|
||||
RetryIntervalWan: 30 * time.Second,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLDefaultPolicy: "allow",
|
||||
RetryInterval: 30 * time.Second,
|
||||
RetryIntervalWan: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -485,6 +485,16 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
|||
*token = s.agent.config.ACLToken
|
||||
}
|
||||
|
||||
// parseSource is used to parse the ?near=<node> query parameter, used for
|
||||
// sorting by RTT based on a source node. We set the source's DC to the target
|
||||
// DC in the request, if given, or else the agent's DC.
|
||||
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
||||
s.parseDC(req, &source.Datacenter)
|
||||
if node := req.URL.Query().Get("near"); node != "" {
|
||||
source.Node = node
|
||||
}
|
||||
}
|
||||
|
||||
// parse is a convenience method for endpoints that need
|
||||
// to use both parseWait and parseDC.
|
||||
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
|
||||
|
|
|
@ -337,6 +337,51 @@ func testPrettyPrint(pretty string, t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestParseSource(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
// Default is agent's DC and no node (since the user didn't care, then
|
||||
// just give them the cheapest possible query).
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/catalog/nodes", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
source := structs.QuerySource{}
|
||||
srv.parseSource(req, &source)
|
||||
if source.Datacenter != "dc1" || source.Node != "" {
|
||||
t.Fatalf("bad: %v", source)
|
||||
}
|
||||
|
||||
// Adding the source parameter should set that node.
|
||||
req, err = http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?near=bob", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
source = structs.QuerySource{}
|
||||
srv.parseSource(req, &source)
|
||||
if source.Datacenter != "dc1" || source.Node != "bob" {
|
||||
t.Fatalf("bad: %v", source)
|
||||
}
|
||||
|
||||
// We should follow whatever dc parameter was given so that the node is
|
||||
// looked up correctly on the receiving end.
|
||||
req, err = http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?near=bob&dc=foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
source = structs.QuerySource{}
|
||||
srv.parseSource(req, &source)
|
||||
if source.Datacenter != "foo" || source.Node != "bob" {
|
||||
t.Fatalf("bad: %v", source)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseWait(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
var b structs.QueryOptions
|
||||
|
|
|
@ -27,8 +27,8 @@ const (
|
|||
aeScaleThreshold = 128
|
||||
)
|
||||
|
||||
// aeScale is used to scale the time interval at which anti-entropy and coordinate
|
||||
// updates take place. It is used to prevent saturation as the cluster size grows.
|
||||
// aeScale is used to scale the time interval at which anti-entropy updates take
|
||||
// place. It is used to prevent saturation as the cluster size grows.
|
||||
func aeScale(interval time.Duration, n int) time.Duration {
|
||||
// Don't scale until we cross the threshold
|
||||
if n <= aeScaleThreshold {
|
||||
|
|
|
@ -25,7 +25,7 @@ func TestAEScale(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRateScaledInterval(t *testing.T) {
|
||||
min := 1*time.Second
|
||||
min := 1 * time.Second
|
||||
rate := 200.0
|
||||
if v := rateScaledInterval(rate, min, 0); v != min {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
|
|
|
@ -132,7 +132,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
|||
}
|
||||
|
||||
reply.Index, reply.Nodes = index, nodes
|
||||
return nil
|
||||
return c.srv.sortByDistanceFrom(args.Source, reply.Nodes)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -189,7 +189,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||
return err
|
||||
}
|
||||
reply.Index, reply.ServiceNodes = index, services
|
||||
return c.srv.filterACL(args.Token, reply)
|
||||
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.srv.sortByDistanceFrom(args.Source, reply.ServiceNodes)
|
||||
})
|
||||
|
||||
// Provide some metrics
|
||||
|
|
|
@ -456,6 +456,95 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogListNodes_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
// Add three nodes.
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
if err := s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureNode(2, structs.Node{"foo", "127.0.0.2"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureNode(3, structs.Node{"bar", "127.0.0.3"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureNode(4, structs.Node{"baz", "127.0.0.4"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Set all but one of the nodes to known coordinates.
|
||||
updates := []structs.Coordinate{
|
||||
{"foo", generateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", generateCoordinate(1 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Query with no given source node, should get the natural order from
|
||||
// the index.
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var out structs.IndexedNodes
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
client.Call("Catalog.ListNodes", &args, &out)
|
||||
return len(out.Nodes) == 5, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
if out.Nodes[0].Node != "aaa" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[1].Node != "bar" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[2].Node != "baz" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[3].Node != "foo" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[4].Node != s1.config.NodeName {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
// Query relative to foo, note that there's no known coordinate for the
|
||||
// default-added Serf node nor "aaa" so they will go at the end.
|
||||
args = structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
client.Call("Catalog.ListNodes", &args, &out)
|
||||
return len(out.Nodes) == 5, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
if out.Nodes[0].Node != "foo" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[1].Node != "baz" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[2].Node != "bar" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[3].Node != "aaa" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.Nodes[4].Node != s1.config.NodeName {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCatalogListNodes(t *testing.B) {
|
||||
dir1, s1 := testServer(nil)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
@ -714,6 +803,93 @@ func TestCatalogListServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "db",
|
||||
}
|
||||
var out structs.IndexedServiceNodes
|
||||
err := client.Call("Catalog.ServiceNodes", &args, &out)
|
||||
if err == nil || err.Error() != "No cluster leader" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
// Add a few nodes for the associated services.
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"})
|
||||
s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000})
|
||||
s1.fsm.State().EnsureNode(3, structs.Node{"foo", "127.0.0.2"})
|
||||
s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.2", 5000})
|
||||
s1.fsm.State().EnsureNode(5, structs.Node{"bar", "127.0.0.3"})
|
||||
s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.3", 5000})
|
||||
s1.fsm.State().EnsureNode(7, structs.Node{"baz", "127.0.0.4"})
|
||||
s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.4", 5000})
|
||||
|
||||
// Set all but one of the nodes to known coordinates.
|
||||
updates := []structs.Coordinate{
|
||||
{"foo", generateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", generateCoordinate(1 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Query with no given source node, should get the natural order from
|
||||
// the index.
|
||||
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.ServiceNodes) != 4 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[0].Node != "aaa" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[1].Node != "foo" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[2].Node != "bar" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[3].Node != "baz" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
// Query relative to foo, note that there's no known coordinate for "aaa"
|
||||
// so it will go at the end.
|
||||
args = structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "db",
|
||||
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
|
||||
}
|
||||
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.ServiceNodes) != 4 {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[0].Node != "foo" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[1].Node != "baz" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[2].Node != "bar" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if out.ServiceNodes[3].Node != "aaa" {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogNodeServices(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
|
|
@ -279,9 +279,9 @@ func DefaultConfig() *Config {
|
|||
// These are tuned to provide a total throughput of 128 updates
|
||||
// per second. If you update these, you should update the client-
|
||||
// side SyncCoordinateRateTarget parameter accordingly.
|
||||
CoordinateUpdatePeriod: 5 * time.Second,
|
||||
CoordinateUpdateBatchSize: 128,
|
||||
CoordinateUpdateMaxBatches: 5,
|
||||
CoordinateUpdatePeriod: 5 * time.Second,
|
||||
CoordinateUpdateBatchSize: 128,
|
||||
CoordinateUpdateMaxBatches: 5,
|
||||
}
|
||||
|
||||
// Increase our reap interval to 3 days instead of 24h.
|
||||
|
|
|
@ -24,7 +24,7 @@ type Coordinate struct {
|
|||
// NewCoordinate returns a new Coordinate endpoint.
|
||||
func NewCoordinate(srv *Server) *Coordinate {
|
||||
c := &Coordinate{
|
||||
srv: srv,
|
||||
srv: srv,
|
||||
updates: make(map[string]*coordinate.Coordinate),
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (c *Coordinate) batchApplyUpdates() error {
|
|||
limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
|
||||
size := len(pending)
|
||||
if size > limit {
|
||||
c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size - limit)
|
||||
c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size-limit)
|
||||
size = limit
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,7 @@ func (c *Coordinate) batchApplyUpdates() error {
|
|||
// batches.
|
||||
updates := make([]structs.Coordinate, size)
|
||||
i := 0
|
||||
for node, coord := range(pending) {
|
||||
for node, coord := range pending {
|
||||
if !(i < size) {
|
||||
break
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
// Now spam some coordinate updates and make sure it starts throwing
|
||||
// them away if they exceed the batch allowance. Node we have to make
|
||||
// unique names since these are held in map by node name.
|
||||
spamLen := s1.config.CoordinateUpdateBatchSize * s1.config.CoordinateUpdateMaxBatches + 1
|
||||
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
|
||||
for i := 0; i < spamLen; i++ {
|
||||
arg1.Node = fmt.Sprintf("bogusnode%d", i)
|
||||
arg1.Coord = generateRandomCoordinate()
|
||||
|
|
|
@ -384,7 +384,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
coord := generateRandomCoordinate()
|
||||
coords := []structs.Coordinate {
|
||||
coords := []structs.Coordinate{
|
||||
structs.Coordinate{"foo", coord},
|
||||
}
|
||||
fsm.state.CoordinateBatchUpdate(13, coords)
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
// computeDistance returns the distance between the two network coordinates in
|
||||
// seconds. If either of the coordinates is nil then this will return positive
|
||||
// infinity.
|
||||
func computeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
|
||||
if a == nil || b == nil {
|
||||
return math.Inf(1.0)
|
||||
}
|
||||
|
||||
return a.DistanceTo(b).Seconds()
|
||||
}
|
||||
|
||||
// nodeSorter takes a list of nodes and a parallel vector of distances and
|
||||
// implements sort.Interface, keeping both structures coherent and sorting by
|
||||
// distance.
|
||||
type nodeSorter struct {
|
||||
Nodes structs.Nodes
|
||||
Vec []float64
|
||||
}
|
||||
|
||||
// newNodeSorter returns a new sorter for the given source coordinate and set of
|
||||
// nodes.
|
||||
func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) {
|
||||
state := s.fsm.State()
|
||||
vec := make([]float64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
_, coord, err := state.CoordinateGet(node.Node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
}
|
||||
return &nodeSorter{nodes, vec}, nil
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *nodeSorter) Len() int {
|
||||
return len(n.Nodes)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *nodeSorter) Swap(i, j int) {
|
||||
n.Nodes[i], n.Nodes[j] = n.Nodes[j], n.Nodes[i]
|
||||
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *nodeSorter) Less(i, j int) bool {
|
||||
return n.Vec[i] < n.Vec[j]
|
||||
}
|
||||
|
||||
// serviceNodeSorter takes a list of service nodes and a parallel vector of
|
||||
// distances and implements sort.Interface, keeping both structures coherent and
|
||||
// sorting by distance.
|
||||
type serviceNodeSorter struct {
|
||||
Nodes structs.ServiceNodes
|
||||
Vec []float64
|
||||
}
|
||||
|
||||
// newServiceNodeSorter returns a new sorter for the given source coordinate and
|
||||
// set of service nodes.
|
||||
func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) {
|
||||
state := s.fsm.State()
|
||||
vec := make([]float64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
_, coord, err := state.CoordinateGet(node.Node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
}
|
||||
return &serviceNodeSorter{nodes, vec}, nil
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *serviceNodeSorter) Len() int {
|
||||
return len(n.Nodes)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *serviceNodeSorter) Swap(i, j int) {
|
||||
n.Nodes[i], n.Nodes[j] = n.Nodes[j], n.Nodes[i]
|
||||
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *serviceNodeSorter) Less(i, j int) bool {
|
||||
return n.Vec[i] < n.Vec[j]
|
||||
}
|
||||
|
||||
// newSorterByDistanceFrom returns a sorter for the given type.
|
||||
func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) {
|
||||
switch v := subj.(type) {
|
||||
case structs.Nodes:
|
||||
return s.newNodeSorter(c, v)
|
||||
case structs.ServiceNodes:
|
||||
return s.newServiceNodeSorter(c, v)
|
||||
default:
|
||||
panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj))
|
||||
}
|
||||
}
|
||||
|
||||
// sortByDistanceFrom is used to sort results from our service catalog based on the
|
||||
// distance (RTT) from the given source node.
|
||||
func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}) error {
|
||||
// We can't compare coordinates across DCs.
|
||||
if source.Datacenter != s.config.Datacenter {
|
||||
return nil
|
||||
}
|
||||
|
||||
// There won't always be a coordinate for the source node. If there's not
|
||||
// one then we can bail out because there's no meaning for the sort.
|
||||
state := s.fsm.State()
|
||||
_, coord, err := state.CoordinateGet(source.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if coord == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Do the Dew!
|
||||
sorter, err := s.newSorterByDistanceFrom(coord, subj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sort.Stable(sorter)
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,239 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
// generateCoordinate creates a new coordinate with the given distance from the
|
||||
// origin.
|
||||
func generateCoordinate(rtt time.Duration) *coordinate.Coordinate {
|
||||
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
|
||||
coord.Vec[0] = rtt.Seconds()
|
||||
return coord
|
||||
}
|
||||
|
||||
// verifyNodeSort makes sure the order of the nodes in the slice is the same as
|
||||
// the expected order, expressed as a comma-separated string.
|
||||
func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) {
|
||||
vec := make([]string, len(nodes))
|
||||
for i, node := range nodes {
|
||||
vec[i] = node.Node
|
||||
}
|
||||
actual := strings.Join(vec, ",")
|
||||
if actual != expected {
|
||||
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
// verifyServiceNodeSort makes sure the order of the nodes in the slice is the
|
||||
// same as the expected order, expressed as a comma-separated string.
|
||||
func verifyServiceNodeSort(t *testing.T, nodes structs.ServiceNodes, expected string) {
|
||||
vec := make([]string, len(nodes))
|
||||
for i, node := range nodes {
|
||||
vec[i] = node.Node
|
||||
}
|
||||
actual := strings.Join(vec, ",")
|
||||
if actual != expected {
|
||||
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
// seedCoordinates uses the client to set up a set of nodes with a specific
|
||||
// set of distances from the origin. We also include the server so that we
|
||||
// can wait for the coordinates to get committed to the Raft log.
|
||||
//
|
||||
// Here's the layout of the nodes:
|
||||
//
|
||||
// node3 node2 node5 node4 node1
|
||||
// | | | | | | | | | | |
|
||||
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
|
||||
//
|
||||
func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) {
|
||||
updates := []structs.CoordinateUpdateRequest{
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Coord: generateCoordinate(10 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Coord: generateCoordinate(2 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
Coord: generateCoordinate(1 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node4",
|
||||
Coord: generateCoordinate(8 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node5",
|
||||
Coord: generateCoordinate(3 * time.Millisecond),
|
||||
},
|
||||
}
|
||||
|
||||
// Apply the updates and wait a while for the batch to get committed to
|
||||
// the Raft log.
|
||||
for _, update := range updates {
|
||||
var out struct{}
|
||||
if err := client.Call("Coordinate.Update", &update, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
time.Sleep(2 * server.config.CoordinateUpdatePeriod)
|
||||
}
|
||||
|
||||
func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
|
||||
dir, server := testServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer server.Shutdown()
|
||||
|
||||
client := rpcClient(t, server)
|
||||
defer client.Close()
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
seedCoordinates(t, client, server)
|
||||
|
||||
nodes := structs.Nodes{
|
||||
structs.Node{Node: "apple"},
|
||||
structs.Node{Node: "node1"},
|
||||
structs.Node{Node: "node2"},
|
||||
structs.Node{Node: "node3"},
|
||||
structs.Node{Node: "node4"},
|
||||
structs.Node{Node: "node5"},
|
||||
}
|
||||
|
||||
// The zero value for the source should not trigger any sorting.
|
||||
var source structs.QuerySource
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Same for a source in some other DC.
|
||||
source.Node = "node1"
|
||||
source.Datacenter = "dc2"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Same for a source node in our DC that we have no coordinate for.
|
||||
source.Node = "apple"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Now sort relative to node1, note that apple doesn't have any
|
||||
// seeded coordinate info so it should end up at the end, despite
|
||||
// its lexical hegemony.
|
||||
source.Node = "node1"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
||||
|
||||
// Try another sort from node2. Note that node5 and node3 are the
|
||||
// same distance away so the stable sort should preserve the order
|
||||
// they were in from the previous sort.
|
||||
source.Node = "node2"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
||||
|
||||
// Let's exercise the stable sort explicitly to make sure we didn't
|
||||
// just get lucky.
|
||||
nodes[1], nodes[2] = nodes[2], nodes[1]
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||
}
|
||||
|
||||
func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
|
||||
dir, server := testServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer server.Shutdown()
|
||||
|
||||
client := rpcClient(t, server)
|
||||
defer client.Close()
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
seedCoordinates(t, client, server)
|
||||
|
||||
nodes := structs.ServiceNodes{
|
||||
structs.ServiceNode{Node: "apple"},
|
||||
structs.ServiceNode{Node: "node1"},
|
||||
structs.ServiceNode{Node: "node2"},
|
||||
structs.ServiceNode{Node: "node3"},
|
||||
structs.ServiceNode{Node: "node4"},
|
||||
structs.ServiceNode{Node: "node5"},
|
||||
}
|
||||
|
||||
// The zero value for the source should not trigger any sorting.
|
||||
var source structs.QuerySource
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Same for a source in some other DC.
|
||||
source.Node = "node1"
|
||||
source.Datacenter = "dc2"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Same for a source node in our DC that we have no coordinate for.
|
||||
source.Node = "apple"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Now sort relative to node1, note that apple doesn't have any
|
||||
// seeded coordinate info so it should end up at the end, despite
|
||||
// its lexical hegemony.
|
||||
source.Node = "node1"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
|
||||
|
||||
// Try another sort from node2. Note that node5 and node3 are the
|
||||
// same distance away so the stable sort should preserve the order
|
||||
// they were in from the previous sort.
|
||||
source.Node = "node2"
|
||||
source.Datacenter = "dc1"
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
|
||||
|
||||
// Let's exercise the stable sort explicitly to make sure we didn't
|
||||
// just get lucky.
|
||||
nodes[1], nodes[2] = nodes[2], nodes[1]
|
||||
if err := server.sortByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||
}
|
|
@ -68,7 +68,7 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
|
|||
config.ReconcileInterval = 100 * time.Millisecond
|
||||
|
||||
config.DisableCoordinates = false
|
||||
config.CoordinateUpdatePeriod = 0 * time.Millisecond
|
||||
config.CoordinateUpdatePeriod = 100 * time.Millisecond
|
||||
return dir, config
|
||||
}
|
||||
|
||||
|
|
|
@ -184,9 +184,18 @@ func (r *DeregisterRequest) RequestDatacenter() string {
|
|||
return r.Datacenter
|
||||
}
|
||||
|
||||
// QuerySource is used to pass along information about the source node
|
||||
// in queries so that we can adjust the response based on its network
|
||||
// coordinates.
|
||||
type QuerySource struct {
|
||||
Datacenter string
|
||||
Node string
|
||||
}
|
||||
|
||||
// DCSpecificRequest is used to query about a specific DC
|
||||
type DCSpecificRequest struct {
|
||||
Datacenter string
|
||||
Source QuerySource
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
@ -200,6 +209,7 @@ type ServiceSpecificRequest struct {
|
|||
ServiceName string
|
||||
ServiceTag string
|
||||
TagFilter bool // Controls tag filtering
|
||||
Source QuerySource
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
|
|
@ -175,6 +175,10 @@ This endpoint is hit with a GET and returns the nodes registered
|
|||
in a given DC. By default, the datacenter of the agent is queried;
|
||||
however, the dc can be provided using the "?dc=" query parameter.
|
||||
|
||||
Adding the optional "?near=" parameter with a node name will sort
|
||||
the node list in ascending order based on the estimated round trip
|
||||
time from that node.
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
```javascript
|
||||
|
@ -226,6 +230,10 @@ The service being queried must be provided on the path. By default
|
|||
all nodes in that service are returned. However, the list can be filtered
|
||||
by tag using the "?tag=" query parameter.
|
||||
|
||||
Adding the optional "?near=" parameter with a node name will sort
|
||||
the node list in ascending order based on the estimated round trip
|
||||
time from that node.
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
```javascript
|
||||
|
|
Loading…
Reference in New Issue