mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
Merge pull request #3622 from hashicorp/coordinate-node-endpoint
agent: add /v1/coordianate/node/:node endpoint
This commit is contained in:
commit
c4375d5a47
@ -139,9 +139,6 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if rule != nil && c.srv.config.ACLEnforceVersion8 {
|
if rule != nil && c.srv.config.ACLEnforceVersion8 {
|
||||||
// We don't enforce the sentinel policy here, since at this time
|
|
||||||
// sentinel only applies to creating or updating node or service
|
|
||||||
// info, not updating coordinates.
|
|
||||||
if !rule.NodeWrite(args.Node, nil) {
|
if !rule.NodeWrite(args.Node, nil) {
|
||||||
return acl.ErrPermissionDenied
|
return acl.ErrPermissionDenied
|
||||||
}
|
}
|
||||||
@ -200,3 +197,41 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Node returns the raw coordinates for a single node.
|
||||||
|
func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
|
||||||
|
if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
||||||
|
rule, err := c.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if rule != nil && c.srv.config.ACLEnforceVersion8 {
|
||||||
|
if !rule.NodeRead(args.Node) {
|
||||||
|
return acl.ErrPermissionDenied
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.srv.blockingQuery(&args.QueryOptions,
|
||||||
|
&reply.QueryMeta,
|
||||||
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
|
index, nodeCoords, err := state.Coordinate(args.Node, ws)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var coords structs.Coordinates
|
||||||
|
for segment, coord := range nodeCoords {
|
||||||
|
coords = append(coords, &structs.Coordinate{
|
||||||
|
Node: args.Node,
|
||||||
|
Segment: segment,
|
||||||
|
Coord: coord,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
reply.Index, reply.Coordinates = index, coords
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net/rpc"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -50,16 +51,8 @@ func TestCoordinate_Update(t *testing.T) {
|
|||||||
|
|
||||||
// Register some nodes.
|
// Register some nodes.
|
||||||
nodes := []string{"node1", "node2"}
|
nodes := []string{"node1", "node2"}
|
||||||
for _, node := range nodes {
|
if err := registerNodes(nodes, codec); err != nil {
|
||||||
req := structs.RegisterRequest{
|
t.Fatal(err)
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: node,
|
|
||||||
Address: "127.0.0.1",
|
|
||||||
}
|
|
||||||
var reply struct{}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send an update for the first node.
|
// Send an update for the first node.
|
||||||
@ -86,13 +79,13 @@ func TestCoordinate_Update(t *testing.T) {
|
|||||||
// Make sure the updates did not yet apply because the update period
|
// Make sure the updates did not yet apply because the update period
|
||||||
// hasn't expired.
|
// hasn't expired.
|
||||||
state := s1.fsm.State()
|
state := s1.fsm.State()
|
||||||
c, err := state.Coordinate("node1")
|
_, c, err := state.Coordinate("node1", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
verify.Values(t, "", c, lib.CoordinateSet{})
|
verify.Values(t, "", c, lib.CoordinateSet{})
|
||||||
|
|
||||||
c, err = state.Coordinate("node2")
|
_, c, err = state.Coordinate("node2", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -107,7 +100,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||||||
|
|
||||||
// Wait a while and the updates should get picked up.
|
// Wait a while and the updates should get picked up.
|
||||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||||
c, err = state.Coordinate("node1")
|
_, c, err = state.Coordinate("node1", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -116,7 +109,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||||||
}
|
}
|
||||||
verify.Values(t, "", c, expected)
|
verify.Values(t, "", c, expected)
|
||||||
|
|
||||||
c, err = state.Coordinate("node2")
|
_, c, err = state.Coordinate("node2", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -155,7 +148,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||||
numDropped := 0
|
numDropped := 0
|
||||||
for i := 0; i < spamLen; i++ {
|
for i := 0; i < spamLen; i++ {
|
||||||
c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
|
_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -201,16 +194,8 @@ func TestCoordinate_Update_ACLDeny(t *testing.T) {
|
|||||||
|
|
||||||
// Register some nodes.
|
// Register some nodes.
|
||||||
nodes := []string{"node1", "node2"}
|
nodes := []string{"node1", "node2"}
|
||||||
for _, node := range nodes {
|
if err := registerNodes(nodes, codec); err != nil {
|
||||||
req := structs.RegisterRequest{
|
t.Fatal(err)
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: node,
|
|
||||||
Address: "127.0.0.1",
|
|
||||||
}
|
|
||||||
var reply struct{}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send an update for the first node. This should go through since we
|
// Send an update for the first node. This should go through since we
|
||||||
@ -309,16 +294,8 @@ func TestCoordinate_ListNodes(t *testing.T) {
|
|||||||
|
|
||||||
// Register some nodes.
|
// Register some nodes.
|
||||||
nodes := []string{"foo", "bar", "baz"}
|
nodes := []string{"foo", "bar", "baz"}
|
||||||
for _, node := range nodes {
|
if err := registerNodes(nodes, codec); err != nil {
|
||||||
req := structs.RegisterRequest{
|
t.Fatal(err)
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: node,
|
|
||||||
Address: "127.0.0.1",
|
|
||||||
}
|
|
||||||
var reply struct{}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send coordinate updates for a few nodes.
|
// Send coordinate updates for a few nodes.
|
||||||
@ -502,3 +479,164 @@ node "foo" {
|
|||||||
t.Fatalf("bad: %#v", resp.Coordinates)
|
t.Fatalf("bad: %#v", resp.Coordinates)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCoordinate_Node(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some nodes.
|
||||||
|
nodes := []string{"foo", "bar"}
|
||||||
|
if err := registerNodes(nodes, codec); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send coordinate updates for each node.
|
||||||
|
arg1 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
arg2 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now query back for a specific node (make sure we only get coordinates for foo).
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
arg := structs.NodeSpecificRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
resp := structs.IndexedCoordinates{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
|
||||||
|
r.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(resp.Coordinates) != 1 ||
|
||||||
|
resp.Coordinates[0].Node != "foo" {
|
||||||
|
r.Fatalf("bad: %v", resp.Coordinates)
|
||||||
|
}
|
||||||
|
verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoordinate_Node_ACLDeny(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.ACLEnforceVersion8 = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some nodes.
|
||||||
|
nodes := []string{"node1", "node2"}
|
||||||
|
if err := registerNodes(nodes, codec); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
coord := generateRandomCoordinate()
|
||||||
|
req := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node1",
|
||||||
|
Coord: coord,
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try a read for the first node. This should go through since we
|
||||||
|
// don't have version 8 ACLs enforced yet.
|
||||||
|
arg := structs.NodeSpecificRequest{
|
||||||
|
Node: "node1",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
resp := structs.IndexedCoordinates{}
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
|
||||||
|
r.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(resp.Coordinates) != 1 ||
|
||||||
|
resp.Coordinates[0].Node != "node1" {
|
||||||
|
r.Fatalf("bad: %v", resp.Coordinates)
|
||||||
|
}
|
||||||
|
verify.Values(t, "", resp.Coordinates[0].Coord, coord)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Now turn on version 8 enforcement and try again.
|
||||||
|
s1.config.ACLEnforceVersion8 = true
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp)
|
||||||
|
if !acl.IsErrPermissionDenied(err) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an ACL that can read from the node.
|
||||||
|
aclReq := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: `
|
||||||
|
node "node1" {
|
||||||
|
policy = "read"
|
||||||
|
}
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
var id string
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &aclReq, &id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the token, it should now go through.
|
||||||
|
arg.Token = id
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// But it should be blocked for the other node.
|
||||||
|
arg.Node = "node2"
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp)
|
||||||
|
if !acl.IsErrPermissionDenied(err) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerNodes(nodes []string, codec rpc.ClientCodec) error {
|
||||||
|
for _, node := range nodes {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: node,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -22,7 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
|
|||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
vec := make([]float64, len(nodes))
|
vec := make([]float64, len(nodes))
|
||||||
for i, node := range nodes {
|
for i, node := range nodes {
|
||||||
other, err := state.Coordinate(node.Node)
|
_, other, err := state.Coordinate(node.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -62,7 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
|
|||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
vec := make([]float64, len(nodes))
|
vec := make([]float64, len(nodes))
|
||||||
for i, node := range nodes {
|
for i, node := range nodes {
|
||||||
other, err := state.Coordinate(node.Node)
|
_, other, err := state.Coordinate(node.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -102,7 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
|
|||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
vec := make([]float64, len(checks))
|
vec := make([]float64, len(checks))
|
||||||
for i, check := range checks {
|
for i, check := range checks {
|
||||||
other, err := state.Coordinate(check.Node)
|
_, other, err := state.Coordinate(check.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -142,7 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
|
|||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
vec := make([]float64, len(nodes))
|
vec := make([]float64, len(nodes))
|
||||||
for i, node := range nodes {
|
for i, node := range nodes {
|
||||||
other, err := state.Coordinate(node.Node.Node)
|
_, other, err := state.Coordinate(node.Node.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -203,7 +203,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
|
|||||||
// There won't always be coordinates for the source node. If there are
|
// There won't always be coordinates for the source node. If there are
|
||||||
// none then we can bail out because there's no meaning for the sort.
|
// none then we can bail out because there's no meaning for the sort.
|
||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
cs, err := state.Coordinate(source.Node)
|
_, cs, err := state.Coordinate(source.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -42,21 +42,24 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
|||||||
|
|
||||||
// Coordinate returns a map of coordinates for the given node, indexed by
|
// Coordinate returns a map of coordinates for the given node, indexed by
|
||||||
// network segment.
|
// network segment.
|
||||||
func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
|
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
tableIdx := maxIndexTxn(tx, "coordinates")
|
||||||
|
|
||||||
iter, err := tx.Get("coordinates", "node", node)
|
iter, err := tx.Get("coordinates", "node", node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
results := make(lib.CoordinateSet)
|
results := make(lib.CoordinateSet)
|
||||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||||
coord := raw.(*structs.Coordinate)
|
coord := raw.(*structs.Coordinate)
|
||||||
results[coord.Segment] = coord.Coord
|
results[coord.Segment] = coord.Coord
|
||||||
}
|
}
|
||||||
return results, nil
|
return tableIdx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Coordinates queries for all nodes with coordinates.
|
// Coordinates queries for all nodes with coordinates.
|
||||||
|
@ -42,7 +42,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
verify.Values(t, "", all, structs.Coordinates{})
|
verify.Values(t, "", all, structs.Coordinates{})
|
||||||
|
|
||||||
coords, err := s.Coordinate("nope")
|
coordinateWs := memdb.NewWatchSet()
|
||||||
|
_, coords, err := s.Coordinate("nope", coordinateWs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
@ -63,7 +64,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
|
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if watchFired(ws) {
|
if watchFired(ws) || watchFired(coordinateWs) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,13 +80,22 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
verify.Values(t, "", all, structs.Coordinates{})
|
verify.Values(t, "", all, structs.Coordinates{})
|
||||||
|
|
||||||
|
coordinateWs = memdb.NewWatchSet()
|
||||||
|
idx, coords, err = s.Coordinate("node1", coordinateWs)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 1 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
// Register the nodes then do the update again.
|
// Register the nodes then do the update again.
|
||||||
testRegisterNode(t, s, 1, "node1")
|
testRegisterNode(t, s, 1, "node1")
|
||||||
testRegisterNode(t, s, 2, "node2")
|
testRegisterNode(t, s, 2, "node2")
|
||||||
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
|
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if !watchFired(ws) {
|
if !watchFired(ws) || !watchFired(coordinateWs) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,11 +111,16 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
verify.Values(t, "", all, updates)
|
verify.Values(t, "", all, updates)
|
||||||
|
|
||||||
// Also verify the per-node coordinate interface.
|
// Also verify the per-node coordinate interface.
|
||||||
for _, update := range updates {
|
nodeWs := make([]memdb.WatchSet, len(updates))
|
||||||
coords, err := s.Coordinate(update.Node)
|
for i, update := range updates {
|
||||||
|
nodeWs[i] = memdb.NewWatchSet()
|
||||||
|
idx, coords, err := s.Coordinate(update.Node, nodeWs[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
if idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
expected := lib.CoordinateSet{
|
expected := lib.CoordinateSet{
|
||||||
"": update.Coord,
|
"": update.Coord,
|
||||||
}
|
}
|
||||||
@ -120,6 +135,11 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
if !watchFired(ws) {
|
if !watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
for _, ws := range nodeWs {
|
||||||
|
if !watchFired(ws) {
|
||||||
|
t.Fatalf("bad")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Verify it got applied.
|
// Verify it got applied.
|
||||||
idx, all, err = s.Coordinates(nil)
|
idx, all, err = s.Coordinates(nil)
|
||||||
@ -133,10 +153,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||||||
|
|
||||||
// And check the per-node coordinate version of the same thing.
|
// And check the per-node coordinate version of the same thing.
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
coords, err := s.Coordinate(update.Node)
|
idx, coords, err := s.Coordinate(update.Node, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
if idx != 4 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
expected := lib.CoordinateSet{
|
expected := lib.CoordinateSet{
|
||||||
"": update.Coord,
|
"": update.Coord,
|
||||||
}
|
}
|
||||||
@ -188,7 +211,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Make sure it's in there.
|
// Make sure it's in there.
|
||||||
coords, err := s.Coordinate("node1")
|
_, coords, err := s.Coordinate("node1", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
@ -204,7 +227,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Make sure the coordinate is gone.
|
// Make sure the coordinate is gone.
|
||||||
coords, err = s.Coordinate("node1")
|
_, coords, err = s.Coordinate("node1", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
@ -85,22 +86,55 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use empty list instead of nil.
|
return filterCoordinates(req, out.Coordinates), nil
|
||||||
if out.Coordinates == nil {
|
}
|
||||||
out.Coordinates = make(structs.Coordinates, 0)
|
|
||||||
}
|
// CoordinateNode returns the LAN node in the given datacenter, along with
|
||||||
|
// raw network coordinates.
|
||||||
// Filter by segment if applicable
|
func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 {
|
if req.Method != "GET" {
|
||||||
segment := v[0]
|
return nil, MethodNotAllowedError{req.Method, []string{"GET"}}
|
||||||
filtered := make(structs.Coordinates, 0)
|
}
|
||||||
for _, coord := range out.Coordinates {
|
|
||||||
if coord.Segment == segment {
|
node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/")
|
||||||
filtered = append(filtered, coord)
|
args := structs.NodeSpecificRequest{Node: node}
|
||||||
}
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||||
}
|
return nil, nil
|
||||||
out.Coordinates = filtered
|
}
|
||||||
}
|
|
||||||
|
var out structs.IndexedCoordinates
|
||||||
return out.Coordinates, nil
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
if err := s.agent.RPC("Coordinate.Node", &args, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := filterCoordinates(req, out.Coordinates)
|
||||||
|
if len(result) == 0 {
|
||||||
|
resp.WriteHeader(http.StatusNotFound)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterCoordinates(req *http.Request, in structs.Coordinates) structs.Coordinates {
|
||||||
|
out := structs.Coordinates{}
|
||||||
|
|
||||||
|
if in == nil {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
segment := ""
|
||||||
|
v, filterBySegment := req.URL.Query()["segment"]
|
||||||
|
if filterBySegment && len(v) > 0 {
|
||||||
|
segment = v[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range in {
|
||||||
|
if filterBySegment && c.Segment != segment {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
out = append(out, c)
|
||||||
|
}
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
@ -140,3 +140,106 @@ func TestCoordinate_Nodes(t *testing.T) {
|
|||||||
t.Fatalf("bad: %v", coordinates)
|
t.Fatalf("bad: %v", coordinates)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCoordinate_Node(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
// Make sure we get a 404 with no coordinates.
|
||||||
|
req, _ := http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.CoordinateNode(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if resp.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("bad: %v", resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the nodes.
|
||||||
|
nodes := []string{"foo", "bar"}
|
||||||
|
for _, node := range nodes {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: node,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
if err := a.RPC("Catalog.Register", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send some coordinates for a few nodes, waiting a little while for the
|
||||||
|
// batch update to run.
|
||||||
|
arg1 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
Segment: "alpha",
|
||||||
|
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := a.RPC("Coordinate.Update", &arg1, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
arg2 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
|
||||||
|
}
|
||||||
|
if err := a.RPC("Coordinate.Update", &arg2, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
|
||||||
|
// Query back and check the nodes are present.
|
||||||
|
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil)
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
obj, err = a.srv.CoordinateNode(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
coordinates := obj.(structs.Coordinates)
|
||||||
|
if len(coordinates) != 1 ||
|
||||||
|
coordinates[0].Node != "foo" {
|
||||||
|
t.Fatalf("bad: %v", coordinates)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter on a nonexistant node segment
|
||||||
|
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=nope", nil)
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
obj, err = a.srv.CoordinateNode(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if resp.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("bad: %v", resp.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter on a real node segment
|
||||||
|
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=alpha", nil)
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
obj, err = a.srv.CoordinateNode(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
coordinates = obj.(structs.Coordinates)
|
||||||
|
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
|
||||||
|
t.Fatalf("bad: %v", coordinates)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the empty filter works
|
||||||
|
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=", nil)
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
obj, err = a.srv.CoordinateNode(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if resp.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("bad: %v", resp.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -139,9 +139,11 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {
|
|||||||
if !s.agent.config.DisableCoordinates {
|
if !s.agent.config.DisableCoordinates {
|
||||||
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
|
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
|
||||||
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
|
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
|
||||||
|
handleFuncMetrics("/v1/coordinate/node/", s.wrap(s.CoordinateNode))
|
||||||
} else {
|
} else {
|
||||||
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
|
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
|
||||||
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
|
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
|
||||||
|
handleFuncMetrics("/v1/coordinate/node/", s.wrap(coordinateDisabled))
|
||||||
}
|
}
|
||||||
handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
|
handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
|
||||||
handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
|
handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
|
||||||
|
@ -350,6 +350,7 @@ func TestHTTPAPI_MethodNotAllowed(t *testing.T) {
|
|||||||
{"GET", "/v1/catalog/services"},
|
{"GET", "/v1/catalog/services"},
|
||||||
{"GET", "/v1/coordinate/datacenters"},
|
{"GET", "/v1/coordinate/datacenters"},
|
||||||
{"GET", "/v1/coordinate/nodes"},
|
{"GET", "/v1/coordinate/nodes"},
|
||||||
|
{"GET", "/v1/coordinate/node/"},
|
||||||
{"PUT", "/v1/event/fire/"},
|
{"PUT", "/v1/event/fire/"},
|
||||||
{"GET", "/v1/event/list"},
|
{"GET", "/v1/event/list"},
|
||||||
{"GET", "/v1/health/checks/"},
|
{"GET", "/v1/health/checks/"},
|
||||||
|
@ -66,3 +66,24 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err
|
|||||||
}
|
}
|
||||||
return out, qm, nil
|
return out, qm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Node is used to return the coordinates of a single in the LAN pool.
|
||||||
|
func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
|
||||||
|
r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
rtt, resp, err := requireOK(c.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
qm := &QueryMeta{}
|
||||||
|
parseQueryMeta(resp, qm)
|
||||||
|
qm.RequestTime = rtt
|
||||||
|
|
||||||
|
var out []*CoordinateEntry
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return out, qm, nil
|
||||||
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
@ -42,3 +43,22 @@ func TestAPI_CoordinateNodes(t *testing.T) {
|
|||||||
// get an error.
|
// get an error.
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAPI_CoordinateNode(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
coordinate := c.Coordinate()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, _, err := coordinate.Node(s.Config.NodeName, nil)
|
||||||
|
if err != nil && !strings.Contains(err.Error(), "Unexpected response code: 404") {
|
||||||
|
r.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// There's not a good way to populate coordinates without
|
||||||
|
// waiting for them to calculate and update, so the best
|
||||||
|
// we can do is call the endpoint and make sure we don't
|
||||||
|
// get an error.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -71,7 +71,7 @@ In **Consul Enterprise**, this will include coordinates for user-added network
|
|||||||
areas as well, as indicated by the `AreaID`. Coordinates are only compatible
|
areas as well, as indicated by the `AreaID`. Coordinates are only compatible
|
||||||
within the same area.
|
within the same area.
|
||||||
|
|
||||||
## Read LAN Coordinates
|
## Read LAN Coordinates for all nodes
|
||||||
|
|
||||||
This endpoint returns the LAN network coordinates for all nodes in a given
|
This endpoint returns the LAN network coordinates for all nodes in a given
|
||||||
datacenter.
|
datacenter.
|
||||||
@ -94,6 +94,11 @@ The table below shows this endpoint's support for
|
|||||||
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
|
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
|
||||||
the datacenter of the agent being queried. This is specified as part of the
|
the datacenter of the agent being queried. This is specified as part of the
|
||||||
URL as a query parameter.
|
URL as a query parameter.
|
||||||
|
- `segment` `(string: "")` - (Enterprise-only) Specifies the segment to list members for.
|
||||||
|
If left blank, this will query for the default segment when connecting to a server and
|
||||||
|
the agent's own segment when connecting to a client (clients can only be part of one
|
||||||
|
network segment). When querying a server, setting this to the special string `_all`
|
||||||
|
will show members in all segments.
|
||||||
|
|
||||||
### Sample Request
|
### Sample Request
|
||||||
|
|
||||||
@ -122,3 +127,59 @@ $ curl \
|
|||||||
In **Consul Enterprise**, this may include multiple coordinates for the same node,
|
In **Consul Enterprise**, this may include multiple coordinates for the same node,
|
||||||
each marked with a different `Segment`. Coordinates are only compatible within the same
|
each marked with a different `Segment`. Coordinates are only compatible within the same
|
||||||
segment.
|
segment.
|
||||||
|
|
||||||
|
## Read LAN Coordinates for a node
|
||||||
|
|
||||||
|
This endpoint returns the LAN network coordinates for the given node.
|
||||||
|
|
||||||
|
| Method | Path | Produces |
|
||||||
|
| ------ | ---------------------------- | -------------------------- |
|
||||||
|
| `GET` | `/coordinate/node/:node` | `application/json` |
|
||||||
|
|
||||||
|
The table below shows this endpoint's support for
|
||||||
|
[blocking queries](/api/index.html#blocking-queries),
|
||||||
|
[consistency modes](/api/index.html#consistency-modes), and
|
||||||
|
[required ACLs](/api/index.html#acls).
|
||||||
|
|
||||||
|
| Blocking Queries | Consistency Modes | ACL Required |
|
||||||
|
| ---------------- | ----------------- | ------------ |
|
||||||
|
| `YES` | `all` | `node:read` |
|
||||||
|
|
||||||
|
### Parameters
|
||||||
|
|
||||||
|
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
|
||||||
|
the datacenter of the agent being queried. This is specified as part of the
|
||||||
|
URL as a query parameter.
|
||||||
|
- `segment` `(string: "")` - (Enterprise-only) Specifies the segment to list members for.
|
||||||
|
If left blank, this will query for the default segment when connecting to a server and
|
||||||
|
the agent's own segment when connecting to a client (clients can only be part of one
|
||||||
|
network segment). When querying a server, setting this to the special string `_all`
|
||||||
|
will show members in all segments.
|
||||||
|
|
||||||
|
### Sample Request
|
||||||
|
|
||||||
|
```text
|
||||||
|
$ curl \
|
||||||
|
https://consul.rocks/v1/coordinate/node/agent-one
|
||||||
|
```
|
||||||
|
|
||||||
|
### Sample Response
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"Node": "agent-one",
|
||||||
|
"Segment": "",
|
||||||
|
"Coord": {
|
||||||
|
"Adjustment": 0,
|
||||||
|
"Error": 1.5,
|
||||||
|
"Height": 0,
|
||||||
|
"Vec": [0, 0, 0, 0, 0, 0, 0, 0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
In **Consul Enterprise**, this may include multiple coordinates for the same node,
|
||||||
|
each marked with a different `Segment`. Coordinates are only compatible within the same
|
||||||
|
segment.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user