diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index a001fad7b0..abf255fbcc 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -200,3 +200,45 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return nil }) } + +// ListNodes returns the list of nodes with their raw network coordinates (if no +// coordinates are available for a node it won't appear in this list). +func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { + if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done { + return err + } + + // Fetch the ACL token, if any, and enforce the node policy if enabled. + rule, err := c.srv.resolveToken(args.Token) + if err != nil { + return err + } + if rule != nil && c.srv.config.ACLEnforceVersion8 { + // We don't enforce the sentinel policy here, since at this time + // sentinel only applies to creating or updating node or service + // info, not updating coordinates. + if !rule.NodeWrite(args.Node, nil) { + return acl.ErrPermissionDenied + } + } + + return c.srv.blockingQuery(&args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, nodeCoords, err := state.Coordinate(args.Node, ws) + if err != nil { + return err + } + + var coords structs.Coordinates + for segment, coord := range nodeCoords { + coords = append(coords, &structs.Coordinate{ + Node: args.Node, + Segment: segment, + Coord: coord, + }) + } + reply.Index, reply.Coordinates = index, coords + return nil + }) +} diff --git a/agent/consul/coordinate_endpoint_test.go b/agent/consul/coordinate_endpoint_test.go index ba9ded9a7f..5f23aeb2cb 100644 --- a/agent/consul/coordinate_endpoint_test.go +++ b/agent/consul/coordinate_endpoint_test.go @@ -86,13 +86,13 @@ func TestCoordinate_Update(t *testing.T) { // Make sure the updates did not yet apply because the update period // hasn't expired. state := s1.fsm.State() - c, err := state.Coordinate("node1") + _, c, err := state.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %v", err) } verify.Values(t, "", c, lib.CoordinateSet{}) - c, err = state.Coordinate("node2") + _, c, err = state.Coordinate("node2", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -107,7 +107,7 @@ func TestCoordinate_Update(t *testing.T) { // Wait a while and the updates should get picked up. time.Sleep(3 * s1.config.CoordinateUpdatePeriod) - c, err = state.Coordinate("node1") + _, c, err = state.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -116,7 +116,7 @@ func TestCoordinate_Update(t *testing.T) { } verify.Values(t, "", c, expected) - c, err = state.Coordinate("node2") + _, c, err = state.Coordinate("node2", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -155,7 +155,7 @@ func TestCoordinate_Update(t *testing.T) { time.Sleep(3 * s1.config.CoordinateUpdatePeriod) numDropped := 0 for i := 0; i < spamLen; i++ { - c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i)) + _, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil) if err != nil { t.Fatalf("err: %v", err) } @@ -502,3 +502,147 @@ node "foo" { t.Fatalf("bad: %#v", resp.Coordinates) } } + +func TestCoordinate_Node(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"foo", "bar"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send coordinate updates for each node. + arg1 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "foo", + Coord: generateRandomCoordinate(), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg2 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "bar", + Coord: generateRandomCoordinate(), + } + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Now query back for a specific node (make sure we only get coordinates for foo). + retry.Run(t, func(r *retry.R) { + arg := structs.NodeSpecificRequest{ + Node: "foo", + Datacenter: "dc1", + } + resp := structs.IndexedCoordinates{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil { + r.Fatalf("err: %v", err) + } + if len(resp.Coordinates) != 1 || + resp.Coordinates[0].Node != "foo" { + r.Fatalf("bad: %v", resp.Coordinates) + } + verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo + }) +} + +func TestCoordinate_Node_ACLDeny(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"node1", "node2"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send an update for the first node. This should go through since we + // don't have version 8 ACLs enforced yet. + req := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node1", + Coord: generateRandomCoordinate(), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Now turn on version 8 enforcement and try again. + s1.config.ACLEnforceVersion8 = true + err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // Create an ACL that can write to the node. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +node "node1" { + policy = "write" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // With the token, it should now go through. + req.Token = id + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // But it should be blocked for the other node. + req.Node = "node2" + err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } +} diff --git a/agent/consul/rtt.go b/agent/consul/rtt.go index 84a39f34dd..8bb1bcaec0 100644 --- a/agent/consul/rtt.go +++ b/agent/consul/rtt.go @@ -22,7 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort. state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node) + _, other, err := state.Coordinate(node.Node, nil) if err != nil { return nil, err } @@ -62,7 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node) + _, other, err := state.Coordinate(node.Node, nil) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt state := s.fsm.State() vec := make([]float64, len(checks)) for i, check := range checks { - other, err := state.Coordinate(check.Node) + _, other, err := state.Coordinate(check.Node, nil) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node.Node) + _, other, err := state.Coordinate(node.Node.Node, nil) if err != nil { return nil, err } @@ -203,7 +203,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf // There won't always be coordinates for the source node. If there are // none then we can bail out because there's no meaning for the sort. state := s.fsm.State() - cs, err := state.Coordinate(source.Node) + _, cs, err := state.Coordinate(source.Node, nil) if err != nil { return err } diff --git a/agent/consul/state/coordinate.go b/agent/consul/state/coordinate.go index 83db264553..68087b1cb1 100644 --- a/agent/consul/state/coordinate.go +++ b/agent/consul/state/coordinate.go @@ -42,21 +42,25 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { // Coordinate returns a map of coordinates for the given node, indexed by // network segment. -func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) { +func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) { tx := s.db.Txn(false) defer tx.Abort() + // Get the table index. + idx := maxIndexTxn(tx, "coordinates") + iter, err := tx.Get("coordinates", "node", node) if err != nil { - return nil, fmt.Errorf("failed coordinate lookup: %s", err) + return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err) } + ws.Add(iter.WatchCh()) results := make(lib.CoordinateSet) for raw := iter.Next(); raw != nil; raw = iter.Next() { coord := raw.(*structs.Coordinate) results[coord.Segment] = coord.Coord } - return results, nil + return idx, results, nil } // Coordinates queries for all nodes with coordinates. diff --git a/agent/consul/state/coordinate_test.go b/agent/consul/state/coordinate_test.go index b126e44787..dcc59c864b 100644 --- a/agent/consul/state/coordinate_test.go +++ b/agent/consul/state/coordinate_test.go @@ -42,7 +42,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { } verify.Values(t, "", all, structs.Coordinates{}) - coords, err := s.Coordinate("nope") + _, coords, err := s.Coordinate("nope", nil) if err != nil { t.Fatalf("err: %s", err) } @@ -102,7 +102,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Also verify the per-node coordinate interface. for _, update := range updates { - coords, err := s.Coordinate(update.Node) + _, coords, err := s.Coordinate(update.Node, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -133,7 +133,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // And check the per-node coordinate version of the same thing. for _, update := range updates { - coords, err := s.Coordinate(update.Node) + _, coords, err := s.Coordinate(update.Node, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -188,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure it's in there. - coords, err := s.Coordinate("node1") + _, coords, err := s.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %s", err) } @@ -204,7 +204,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure the coordinate is gone. - coords, err = s.Coordinate("node1") + _, coords, err = s.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index 8dd944393c..ade8b582a9 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -96,19 +96,19 @@ func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request) return nil, MethodNotAllowedError{req.Method, []string{"GET"}} } - args := structs.DCSpecificRequest{} + node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/") + args := structs.NodeSpecificRequest{Node: node} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } var out structs.IndexedCoordinates defer setMeta(resp, &out.QueryMeta) - if err := s.agent.RPC("Coordinate.ListNodes", &args, &out); err != nil { + if err := s.agent.RPC("Coordinate.Node", &args, &out); err != nil { sort.Sort(&sorter{out.Coordinates}) return nil, err } - node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/") return filterCoordinates(req, node, out.Coordinates), nil } diff --git a/api/coordinate.go b/api/coordinate.go index 90214e392c..42df0decaf 100644 --- a/api/coordinate.go +++ b/api/coordinate.go @@ -66,3 +66,24 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err } return out, qm, nil } + +// Node is used to return the coordinates of a single in the LAN pool. +func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) { + r := c.c.newRequest("GET", "/v1/coordinate/node/"+node) + r.setQueryOptions(q) + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + var out []*CoordinateEntry + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + return out, qm, nil +} diff --git a/api/coordinate_test.go b/api/coordinate_test.go index f41756b57a..a91d1869b1 100644 --- a/api/coordinate_test.go +++ b/api/coordinate_test.go @@ -42,3 +42,22 @@ func TestAPI_CoordinateNodes(t *testing.T) { // get an error. }) } + +func TestAPI_CoordinateNode(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + coordinate := c.Coordinate() + retry.Run(t, func(r *retry.R) { + _, _, err := coordinate.Node(s.Config.NodeName, nil) + if err != nil { + r.Fatal(err) + } + + // There's not a good way to populate coordinates without + // waiting for them to calculate and update, so the best + // we can do is call the endpoint and make sure we don't + // get an error. + }) +}