diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 559593f8e9..447822a875 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -60,6 +60,7 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Setup the request args := structs.DCSpecificRequest{} + s.parseSource(req, &args.Source) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } @@ -90,6 +91,7 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.ServiceSpecificRequest{} + s.parseSource(req, &args.Source) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/config.go b/command/agent/config.go index 7e68c294ee..dd3afdc421 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -476,12 +476,12 @@ func DefaultConfig() *Config { DNSConfig: DNSConfig{ MaxStale: 5 * time.Second, }, - StatsitePrefix: "consul", - SyslogFacility: "LOCAL0", - Protocol: consul.ProtocolVersionMax, - CheckUpdateInterval: 5 * time.Minute, - AEInterval: time.Minute, - DisableCoordinates: false, + StatsitePrefix: "consul", + SyslogFacility: "LOCAL0", + Protocol: consul.ProtocolVersionMax, + CheckUpdateInterval: 5 * time.Minute, + AEInterval: time.Minute, + DisableCoordinates: false, // SyncCoordinateRateTarget is set based on the rate that we want // the server to handle as an aggregate across the entire cluster. @@ -490,11 +490,11 @@ func DefaultConfig() *Config { SyncCoordinateRateTarget: 64.0, // updates / second SyncCoordinateIntervalMin: 15 * time.Second, - ACLTTL: 30 * time.Second, - ACLDownPolicy: "extend-cache", - ACLDefaultPolicy: "allow", - RetryInterval: 30 * time.Second, - RetryIntervalWan: 30 * time.Second, + ACLTTL: 30 * time.Second, + ACLDownPolicy: "extend-cache", + ACLDefaultPolicy: "allow", + RetryInterval: 30 * time.Second, + RetryIntervalWan: 30 * time.Second, } } diff --git a/command/agent/http.go b/command/agent/http.go index 02c20b3a43..b217c52637 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -485,6 +485,16 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) { *token = s.agent.config.ACLToken } +// parseSource is used to parse the ?near= query parameter, used for +// sorting by RTT based on a source node. We set the source's DC to the target +// DC in the request, if given, or else the agent's DC. +func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) { + s.parseDC(req, &source.Datacenter) + if node := req.URL.Query().Get("near"); node != "" { + source.Node = node + } +} + // parse is a convenience method for endpoints that need // to use both parseWait and parseDC. func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index a11c000266..ff10a1fff5 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -337,6 +337,51 @@ func testPrettyPrint(pretty string, t *testing.T) { } } +func TestParseSource(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Default is agent's DC and no node (since the user didn't care, then + // just give them the cheapest possible query). + req, err := http.NewRequest("GET", + "/v1/catalog/nodes", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + source := structs.QuerySource{} + srv.parseSource(req, &source) + if source.Datacenter != "dc1" || source.Node != "" { + t.Fatalf("bad: %v", source) + } + + // Adding the source parameter should set that node. + req, err = http.NewRequest("GET", + "/v1/catalog/nodes?near=bob", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + source = structs.QuerySource{} + srv.parseSource(req, &source) + if source.Datacenter != "dc1" || source.Node != "bob" { + t.Fatalf("bad: %v", source) + } + + // We should follow whatever dc parameter was given so that the node is + // looked up correctly on the receiving end. + req, err = http.NewRequest("GET", + "/v1/catalog/nodes?near=bob&dc=foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + source = structs.QuerySource{} + srv.parseSource(req, &source) + if source.Datacenter != "foo" || source.Node != "bob" { + t.Fatalf("bad: %v", source) + } +} + func TestParseWait(t *testing.T) { resp := httptest.NewRecorder() var b structs.QueryOptions diff --git a/command/agent/util.go b/command/agent/util.go index 075cf0c730..5f07bb6851 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -27,8 +27,8 @@ const ( aeScaleThreshold = 128 ) -// aeScale is used to scale the time interval at which anti-entropy and coordinate -// updates take place. It is used to prevent saturation as the cluster size grows. +// aeScale is used to scale the time interval at which anti-entropy updates take +// place. It is used to prevent saturation as the cluster size grows. func aeScale(interval time.Duration, n int) time.Duration { // Don't scale until we cross the threshold if n <= aeScaleThreshold { diff --git a/command/agent/util_test.go b/command/agent/util_test.go index 9466ced442..e87e28850d 100644 --- a/command/agent/util_test.go +++ b/command/agent/util_test.go @@ -25,7 +25,7 @@ func TestAEScale(t *testing.T) { } func TestRateScaledInterval(t *testing.T) { - min := 1*time.Second + min := 1 * time.Second rate := 200.0 if v := rateScaledInterval(rate, min, 0); v != min { t.Fatalf("Bad: %v", v) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 28aa4c8138..4e7bab650e 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -132,7 +132,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde } reply.Index, reply.Nodes = index, nodes - return nil + return c.srv.sortByDistanceFrom(args.Source, reply.Nodes) }) } @@ -189,7 +189,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return err } reply.Index, reply.ServiceNodes = index, services - return c.srv.filterACL(args.Token, reply) + if err := c.srv.filterACL(args.Token, reply); err != nil { + return err + } + return c.srv.sortByDistanceFrom(args.Source, reply.ServiceNodes) }) // Provide some metrics diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index b1ec44c36a..efc418fbb4 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -456,6 +456,95 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) { } } +func TestCatalogListNodes_DistanceSort(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Add three nodes. + testutil.WaitForLeader(t, client.Call, "dc1") + if err := s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.State().EnsureNode(2, structs.Node{"foo", "127.0.0.2"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.State().EnsureNode(3, structs.Node{"bar", "127.0.0.3"}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.State().EnsureNode(4, structs.Node{"baz", "127.0.0.4"}); err != nil { + t.Fatalf("err: %v", err) + } + + // Set all but one of the nodes to known coordinates. + updates := []structs.Coordinate{ + {"foo", generateCoordinate(2 * time.Millisecond)}, + {"bar", generateCoordinate(5 * time.Millisecond)}, + {"baz", generateCoordinate(1 * time.Millisecond)}, + } + if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil { + t.Fatalf("err: %v", err) + } + + // Query with no given source node, should get the natural order from + // the index. + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedNodes + testutil.WaitForResult(func() (bool, error) { + client.Call("Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 5, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + if out.Nodes[0].Node != "aaa" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[1].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[2].Node != "baz" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[3].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[4].Node != s1.config.NodeName { + t.Fatalf("bad: %v", out) + } + + // Query relative to foo, note that there's no known coordinate for the + // default-added Serf node nor "aaa" so they will go at the end. + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, + } + testutil.WaitForResult(func() (bool, error) { + client.Call("Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 5, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + if out.Nodes[0].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[1].Node != "baz" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[2].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[3].Node != "aaa" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[4].Node != s1.config.NodeName { + t.Fatalf("bad: %v", out) + } +} + func BenchmarkCatalogListNodes(t *testing.B) { dir1, s1 := testServer(nil) defer os.RemoveAll(dir1) @@ -714,6 +803,93 @@ func TestCatalogListServiceNodes(t *testing.T) { } } +func TestCatalogListServiceNodes_DistanceSort(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "db", + } + var out structs.IndexedServiceNodes + err := client.Call("Catalog.ServiceNodes", &args, &out) + if err == nil || err.Error() != "No cluster leader" { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, client.Call, "dc1") + + // Add a few nodes for the associated services. + s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}) + s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000}) + s1.fsm.State().EnsureNode(3, structs.Node{"foo", "127.0.0.2"}) + s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.2", 5000}) + s1.fsm.State().EnsureNode(5, structs.Node{"bar", "127.0.0.3"}) + s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.3", 5000}) + s1.fsm.State().EnsureNode(7, structs.Node{"baz", "127.0.0.4"}) + s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.4", 5000}) + + // Set all but one of the nodes to known coordinates. + updates := []structs.Coordinate{ + {"foo", generateCoordinate(2 * time.Millisecond)}, + {"bar", generateCoordinate(5 * time.Millisecond)}, + {"baz", generateCoordinate(1 * time.Millisecond)}, + } + if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil { + t.Fatalf("err: %v", err) + } + + // Query with no given source node, should get the natural order from + // the index. + if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.ServiceNodes) != 4 { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[0].Node != "aaa" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[1].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[2].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[3].Node != "baz" { + t.Fatalf("bad: %v", out) + } + + // Query relative to foo, note that there's no known coordinate for "aaa" + // so it will go at the end. + args = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "db", + Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, + } + if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.ServiceNodes) != 4 { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[0].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[1].Node != "baz" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[2].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[3].Node != "aaa" { + t.Fatalf("bad: %v", out) + } +} + func TestCatalogNodeServices(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/config.go b/consul/config.go index b72f8807db..6336933a54 100644 --- a/consul/config.go +++ b/consul/config.go @@ -279,9 +279,9 @@ func DefaultConfig() *Config { // These are tuned to provide a total throughput of 128 updates // per second. If you update these, you should update the client- // side SyncCoordinateRateTarget parameter accordingly. - CoordinateUpdatePeriod: 5 * time.Second, - CoordinateUpdateBatchSize: 128, - CoordinateUpdateMaxBatches: 5, + CoordinateUpdatePeriod: 5 * time.Second, + CoordinateUpdateBatchSize: 128, + CoordinateUpdateMaxBatches: 5, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index ddb8ecb3bf..0d2c4325c6 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -24,7 +24,7 @@ type Coordinate struct { // NewCoordinate returns a new Coordinate endpoint. func NewCoordinate(srv *Server) *Coordinate { c := &Coordinate{ - srv: srv, + srv: srv, updates: make(map[string]*coordinate.Coordinate), } @@ -60,7 +60,7 @@ func (c *Coordinate) batchApplyUpdates() error { limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches size := len(pending) if size > limit { - c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size - limit) + c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size-limit) size = limit } @@ -68,7 +68,7 @@ func (c *Coordinate) batchApplyUpdates() error { // batches. updates := make([]structs.Coordinate, size) i := 0 - for node, coord := range(pending) { + for node, coord := range pending { if !(i < size) { break } diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index f5fc61ba47..22376a305f 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -117,7 +117,7 @@ func TestCoordinate_Update(t *testing.T) { // Now spam some coordinate updates and make sure it starts throwing // them away if they exceed the batch allowance. Node we have to make // unique names since these are held in map by node name. - spamLen := s1.config.CoordinateUpdateBatchSize * s1.config.CoordinateUpdateMaxBatches + 1 + spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1 for i := 0; i < spamLen; i++ { arg1.Node = fmt.Sprintf("bogusnode%d", i) arg1.Coord = generateRandomCoordinate() diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 8764ee041e..2f7dca6c4e 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -384,7 +384,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } coord := generateRandomCoordinate() - coords := []structs.Coordinate { + coords := []structs.Coordinate{ structs.Coordinate{"foo", coord}, } fsm.state.CoordinateBatchUpdate(13, coords) diff --git a/consul/rtt.go b/consul/rtt.go new file mode 100644 index 0000000000..3cfe90fb49 --- /dev/null +++ b/consul/rtt.go @@ -0,0 +1,139 @@ +package consul + +import ( + "fmt" + "math" + "sort" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/coordinate" +) + +// computeDistance returns the distance between the two network coordinates in +// seconds. If either of the coordinates is nil then this will return positive +// infinity. +func computeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 { + if a == nil || b == nil { + return math.Inf(1.0) + } + + return a.DistanceTo(b).Seconds() +} + +// nodeSorter takes a list of nodes and a parallel vector of distances and +// implements sort.Interface, keeping both structures coherent and sorting by +// distance. +type nodeSorter struct { + Nodes structs.Nodes + Vec []float64 +} + +// newNodeSorter returns a new sorter for the given source coordinate and set of +// nodes. +func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) { + state := s.fsm.State() + vec := make([]float64, len(nodes)) + for i, node := range nodes { + _, coord, err := state.CoordinateGet(node.Node) + if err != nil { + return nil, err + } + vec[i] = computeDistance(c, coord) + } + return &nodeSorter{nodes, vec}, nil +} + +// See sort.Interface. +func (n *nodeSorter) Len() int { + return len(n.Nodes) +} + +// See sort.Interface. +func (n *nodeSorter) Swap(i, j int) { + n.Nodes[i], n.Nodes[j] = n.Nodes[j], n.Nodes[i] + n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i] +} + +// See sort.Interface. +func (n *nodeSorter) Less(i, j int) bool { + return n.Vec[i] < n.Vec[j] +} + +// serviceNodeSorter takes a list of service nodes and a parallel vector of +// distances and implements sort.Interface, keeping both structures coherent and +// sorting by distance. +type serviceNodeSorter struct { + Nodes structs.ServiceNodes + Vec []float64 +} + +// newServiceNodeSorter returns a new sorter for the given source coordinate and +// set of service nodes. +func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) { + state := s.fsm.State() + vec := make([]float64, len(nodes)) + for i, node := range nodes { + _, coord, err := state.CoordinateGet(node.Node) + if err != nil { + return nil, err + } + vec[i] = computeDistance(c, coord) + } + return &serviceNodeSorter{nodes, vec}, nil +} + +// See sort.Interface. +func (n *serviceNodeSorter) Len() int { + return len(n.Nodes) +} + +// See sort.Interface. +func (n *serviceNodeSorter) Swap(i, j int) { + n.Nodes[i], n.Nodes[j] = n.Nodes[j], n.Nodes[i] + n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i] +} + +// See sort.Interface. +func (n *serviceNodeSorter) Less(i, j int) bool { + return n.Vec[i] < n.Vec[j] +} + +// newSorterByDistanceFrom returns a sorter for the given type. +func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) { + switch v := subj.(type) { + case structs.Nodes: + return s.newNodeSorter(c, v) + case structs.ServiceNodes: + return s.newServiceNodeSorter(c, v) + default: + panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj)) + } +} + +// sortByDistanceFrom is used to sort results from our service catalog based on the +// distance (RTT) from the given source node. +func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}) error { + // We can't compare coordinates across DCs. + if source.Datacenter != s.config.Datacenter { + return nil + } + + // There won't always be a coordinate for the source node. If there's not + // one then we can bail out because there's no meaning for the sort. + state := s.fsm.State() + _, coord, err := state.CoordinateGet(source.Node) + if err != nil { + return err + } + if coord == nil { + return nil + } + + // Do the Dew! + sorter, err := s.newSorterByDistanceFrom(coord, subj) + if err != nil { + return err + } + sort.Stable(sorter) + return nil +} diff --git a/consul/rtt_test.go b/consul/rtt_test.go new file mode 100644 index 0000000000..0d4ec3bd56 --- /dev/null +++ b/consul/rtt_test.go @@ -0,0 +1,239 @@ +package consul + +import ( + "net/rpc" + "os" + "strings" + "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/coordinate" +) + +// generateCoordinate creates a new coordinate with the given distance from the +// origin. +func generateCoordinate(rtt time.Duration) *coordinate.Coordinate { + coord := coordinate.NewCoordinate(coordinate.DefaultConfig()) + coord.Vec[0] = rtt.Seconds() + return coord +} + +// verifyNodeSort makes sure the order of the nodes in the slice is the same as +// the expected order, expressed as a comma-separated string. +func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) { + vec := make([]string, len(nodes)) + for i, node := range nodes { + vec[i] = node.Node + } + actual := strings.Join(vec, ",") + if actual != expected { + t.Fatalf("bad sort: %s != %s", actual, expected) + } +} + +// verifyServiceNodeSort makes sure the order of the nodes in the slice is the +// same as the expected order, expressed as a comma-separated string. +func verifyServiceNodeSort(t *testing.T, nodes structs.ServiceNodes, expected string) { + vec := make([]string, len(nodes)) + for i, node := range nodes { + vec[i] = node.Node + } + actual := strings.Join(vec, ",") + if actual != expected { + t.Fatalf("bad sort: %s != %s", actual, expected) + } +} + +// seedCoordinates uses the client to set up a set of nodes with a specific +// set of distances from the origin. We also include the server so that we +// can wait for the coordinates to get committed to the Raft log. +// +// Here's the layout of the nodes: +// +// node3 node2 node5 node4 node1 +// | | | | | | | | | | | +// 0 1 2 3 4 5 6 7 8 9 10 (ms) +// +func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) { + updates := []structs.CoordinateUpdateRequest{ + structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node1", + Coord: generateCoordinate(10 * time.Millisecond), + }, + structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node2", + Coord: generateCoordinate(2 * time.Millisecond), + }, + structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node3", + Coord: generateCoordinate(1 * time.Millisecond), + }, + structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node4", + Coord: generateCoordinate(8 * time.Millisecond), + }, + structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node5", + Coord: generateCoordinate(3 * time.Millisecond), + }, + } + + // Apply the updates and wait a while for the batch to get committed to + // the Raft log. + for _, update := range updates { + var out struct{} + if err := client.Call("Coordinate.Update", &update, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + time.Sleep(2 * server.config.CoordinateUpdatePeriod) +} + +func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { + dir, server := testServer(t) + defer os.RemoveAll(dir) + defer server.Shutdown() + + client := rpcClient(t, server) + defer client.Close() + testutil.WaitForLeader(t, client.Call, "dc1") + seedCoordinates(t, client, server) + + nodes := structs.Nodes{ + structs.Node{Node: "apple"}, + structs.Node{Node: "node1"}, + structs.Node{Node: "node2"}, + structs.Node{Node: "node3"}, + structs.Node{Node: "node4"}, + structs.Node{Node: "node5"}, + } + + // The zero value for the source should not trigger any sorting. + var source structs.QuerySource + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Same for a source in some other DC. + source.Node = "node1" + source.Datacenter = "dc2" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Same for a source node in our DC that we have no coordinate for. + source.Node = "apple" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Now sort relative to node1, note that apple doesn't have any + // seeded coordinate info so it should end up at the end, despite + // its lexical hegemony. + source.Node = "node1" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple") + + // Try another sort from node2. Note that node5 and node3 are the + // same distance away so the stable sort should preserve the order + // they were in from the previous sort. + source.Node = "node2" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple") + + // Let's exercise the stable sort explicitly to make sure we didn't + // just get lucky. + nodes[1], nodes[2] = nodes[2], nodes[1] + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple") +} + +func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { + dir, server := testServer(t) + defer os.RemoveAll(dir) + defer server.Shutdown() + + client := rpcClient(t, server) + defer client.Close() + testutil.WaitForLeader(t, client.Call, "dc1") + seedCoordinates(t, client, server) + + nodes := structs.ServiceNodes{ + structs.ServiceNode{Node: "apple"}, + structs.ServiceNode{Node: "node1"}, + structs.ServiceNode{Node: "node2"}, + structs.ServiceNode{Node: "node3"}, + structs.ServiceNode{Node: "node4"}, + structs.ServiceNode{Node: "node5"}, + } + + // The zero value for the source should not trigger any sorting. + var source structs.QuerySource + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Same for a source in some other DC. + source.Node = "node1" + source.Datacenter = "dc2" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Same for a source node in our DC that we have no coordinate for. + source.Node = "apple" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") + + // Now sort relative to node1, note that apple doesn't have any + // seeded coordinate info so it should end up at the end, despite + // its lexical hegemony. + source.Node = "node1" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple") + + // Try another sort from node2. Note that node5 and node3 are the + // same distance away so the stable sort should preserve the order + // they were in from the previous sort. + source.Node = "node2" + source.Datacenter = "dc1" + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple") + + // Let's exercise the stable sort explicitly to make sure we didn't + // just get lucky. + nodes[1], nodes[2] = nodes[2], nodes[1] + if err := server.sortByDistanceFrom(source, nodes); err != nil { + t.Fatalf("err: %v", err) + } + verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple") +} diff --git a/consul/server_test.go b/consul/server_test.go index 58737cc96b..a4a3887dc4 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -68,7 +68,7 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) { config.ReconcileInterval = 100 * time.Millisecond config.DisableCoordinates = false - config.CoordinateUpdatePeriod = 0 * time.Millisecond + config.CoordinateUpdatePeriod = 100 * time.Millisecond return dir, config } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 552432a826..069e4bbca3 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -184,9 +184,18 @@ func (r *DeregisterRequest) RequestDatacenter() string { return r.Datacenter } +// QuerySource is used to pass along information about the source node +// in queries so that we can adjust the response based on its network +// coordinates. +type QuerySource struct { + Datacenter string + Node string +} + // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { Datacenter string + Source QuerySource QueryOptions } @@ -200,6 +209,7 @@ type ServiceSpecificRequest struct { ServiceName string ServiceTag string TagFilter bool // Controls tag filtering + Source QuerySource QueryOptions } diff --git a/website/source/docs/agent/http/catalog.html.markdown b/website/source/docs/agent/http/catalog.html.markdown index 734c702b94..554719e5ca 100644 --- a/website/source/docs/agent/http/catalog.html.markdown +++ b/website/source/docs/agent/http/catalog.html.markdown @@ -175,6 +175,10 @@ This endpoint is hit with a GET and returns the nodes registered in a given DC. By default, the datacenter of the agent is queried; however, the dc can be provided using the "?dc=" query parameter. +Adding the optional "?near=" parameter with a node name will sort +the node list in ascending order based on the estimated round trip +time from that node. + It returns a JSON body like this: ```javascript @@ -226,6 +230,10 @@ The service being queried must be provided on the path. By default all nodes in that service are returned. However, the list can be filtered by tag using the "?tag=" query parameter. +Adding the optional "?near=" parameter with a node name will sort +the node list in ascending order based on the estimated round trip +time from that node. + It returns a JSON body like this: ```javascript