mirror of https://github.com/status-im/consul.git
Added Coordinate.Node rpc endpoint and client api method
This commit is contained in:
parent
ca9aac746f
commit
5589eadcf5
|
@ -200,3 +200,45 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListNodes returns the list of nodes with their raw network coordinates (if no
|
||||
// coordinates are available for a node it won't appear in this list).
|
||||
func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
|
||||
if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
||||
rule, err := c.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rule != nil && c.srv.config.ACLEnforceVersion8 {
|
||||
// We don't enforce the sentinel policy here, since at this time
|
||||
// sentinel only applies to creating or updating node or service
|
||||
// info, not updating coordinates.
|
||||
if !rule.NodeWrite(args.Node, nil) {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
return c.srv.blockingQuery(&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, nodeCoords, err := state.Coordinate(args.Node, ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var coords structs.Coordinates
|
||||
for segment, coord := range nodeCoords {
|
||||
coords = append(coords, &structs.Coordinate{
|
||||
Node: args.Node,
|
||||
Segment: segment,
|
||||
Coord: coord,
|
||||
})
|
||||
}
|
||||
reply.Index, reply.Coordinates = index, coords
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -86,13 +86,13 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
// Make sure the updates did not yet apply because the update period
|
||||
// hasn't expired.
|
||||
state := s1.fsm.State()
|
||||
c, err := state.Coordinate("node1")
|
||||
_, c, err := state.Coordinate("node1", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verify.Values(t, "", c, lib.CoordinateSet{})
|
||||
|
||||
c, err = state.Coordinate("node2")
|
||||
_, c, err = state.Coordinate("node2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
|
||||
// Wait a while and the updates should get picked up.
|
||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
c, err = state.Coordinate("node1")
|
||||
_, c, err = state.Coordinate("node1", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
}
|
||||
verify.Values(t, "", c, expected)
|
||||
|
||||
c, err = state.Coordinate("node2")
|
||||
_, c, err = state.Coordinate("node2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
||||
numDropped := 0
|
||||
for i := 0; i < spamLen; i++ {
|
||||
c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
|
||||
_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -502,3 +502,147 @@ node "foo" {
|
|||
t.Fatalf("bad: %#v", resp.Coordinates)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinate_Node(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Register some nodes.
|
||||
nodes := []string{"foo", "bar"}
|
||||
for _, node := range nodes {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: node,
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
var reply struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send coordinate updates for each node.
|
||||
arg1 := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
var out struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
arg2 := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now query back for a specific node (make sure we only get coordinates for foo).
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
arg := structs.NodeSpecificRequest{
|
||||
Node: "foo",
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
resp := structs.IndexedCoordinates{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(resp.Coordinates) != 1 ||
|
||||
resp.Coordinates[0].Node != "foo" {
|
||||
r.Fatalf("bad: %v", resp.Coordinates)
|
||||
}
|
||||
verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo
|
||||
})
|
||||
}
|
||||
|
||||
func TestCoordinate_Node_ACLDeny(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.ACLEnforceVersion8 = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Register some nodes.
|
||||
nodes := []string{"node1", "node2"}
|
||||
for _, node := range nodes {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: node,
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
var reply struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send an update for the first node. This should go through since we
|
||||
// don't have version 8 ACLs enforced yet.
|
||||
req := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
var out struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now turn on version 8 enforcement and try again.
|
||||
s1.config.ACLEnforceVersion8 = true
|
||||
err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create an ACL that can write to the node.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: `
|
||||
node "node1" {
|
||||
policy = "write"
|
||||
}
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// With the token, it should now go through.
|
||||
req.Token = id
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// But it should be blocked for the other node.
|
||||
req.Node = "node2"
|
||||
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
|
||||
if !acl.IsErrPermissionDenied(err) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
|
|||
state := s.fsm.State()
|
||||
vec := make([]float64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
other, err := state.Coordinate(node.Node)
|
||||
_, other, err := state.Coordinate(node.Node, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
|
|||
state := s.fsm.State()
|
||||
vec := make([]float64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
other, err := state.Coordinate(node.Node)
|
||||
_, other, err := state.Coordinate(node.Node, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
|
|||
state := s.fsm.State()
|
||||
vec := make([]float64, len(checks))
|
||||
for i, check := range checks {
|
||||
other, err := state.Coordinate(check.Node)
|
||||
_, other, err := state.Coordinate(check.Node, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
|
|||
state := s.fsm.State()
|
||||
vec := make([]float64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
other, err := state.Coordinate(node.Node.Node)
|
||||
_, other, err := state.Coordinate(node.Node.Node, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
|
|||
// There won't always be coordinates for the source node. If there are
|
||||
// none then we can bail out because there's no meaning for the sort.
|
||||
state := s.fsm.State()
|
||||
cs, err := state.Coordinate(source.Node)
|
||||
_, cs, err := state.Coordinate(source.Node, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -42,21 +42,25 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
|||
|
||||
// Coordinate returns a map of coordinates for the given node, indexed by
|
||||
// network segment.
|
||||
func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
|
||||
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "coordinates")
|
||||
|
||||
iter, err := tx.Get("coordinates", "node", node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
results := make(lib.CoordinateSet)
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
coord := raw.(*structs.Coordinate)
|
||||
results[coord.Segment] = coord.Coord
|
||||
}
|
||||
return results, nil
|
||||
return idx, results, nil
|
||||
}
|
||||
|
||||
// Coordinates queries for all nodes with coordinates.
|
||||
|
|
|
@ -42,7 +42,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||
}
|
||||
verify.Values(t, "", all, structs.Coordinates{})
|
||||
|
||||
coords, err := s.Coordinate("nope")
|
||||
_, coords, err := s.Coordinate("nope", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||
|
||||
// Also verify the per-node coordinate interface.
|
||||
for _, update := range updates {
|
||||
coords, err := s.Coordinate(update.Node)
|
||||
_, coords, err := s.Coordinate(update.Node, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
|||
|
||||
// And check the per-node coordinate version of the same thing.
|
||||
for _, update := range updates {
|
||||
coords, err := s.Coordinate(update.Node)
|
||||
_, coords, err := s.Coordinate(update.Node, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure it's in there.
|
||||
coords, err := s.Coordinate("node1")
|
||||
_, coords, err := s.Coordinate("node1", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the coordinate is gone.
|
||||
coords, err = s.Coordinate("node1")
|
||||
_, coords, err = s.Coordinate("node1", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -96,19 +96,19 @@ func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request)
|
|||
return nil, MethodNotAllowedError{req.Method, []string{"GET"}}
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{}
|
||||
node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/")
|
||||
args := structs.NodeSpecificRequest{Node: node}
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.IndexedCoordinates
|
||||
defer setMeta(resp, &out.QueryMeta)
|
||||
if err := s.agent.RPC("Coordinate.ListNodes", &args, &out); err != nil {
|
||||
if err := s.agent.RPC("Coordinate.Node", &args, &out); err != nil {
|
||||
sort.Sort(&sorter{out.Coordinates})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/")
|
||||
return filterCoordinates(req, node, out.Coordinates), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -66,3 +66,24 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err
|
|||
}
|
||||
return out, qm, nil
|
||||
}
|
||||
|
||||
// Node is used to return the coordinates of a single in the LAN pool.
|
||||
func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
|
||||
r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
|
||||
r.setQueryOptions(q)
|
||||
rtt, resp, err := requireOK(c.c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
var out []*CoordinateEntry
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return out, qm, nil
|
||||
}
|
||||
|
|
|
@ -42,3 +42,22 @@ func TestAPI_CoordinateNodes(t *testing.T) {
|
|||
// get an error.
|
||||
})
|
||||
}
|
||||
|
||||
func TestAPI_CoordinateNode(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
||||
coordinate := c.Coordinate()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, _, err := coordinate.Node(s.Config.NodeName, nil)
|
||||
if err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
|
||||
// There's not a good way to populate coordinates without
|
||||
// waiting for them to calculate and update, so the best
|
||||
// we can do is call the endpoint and make sure we don't
|
||||
// get an error.
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue