diff --git a/command/agent/local_test.go b/command/agent/local_test.go index d456d387bb..7d595e9e40 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -841,14 +841,15 @@ func TestAgent_sendCoordinate(t *testing.T) { conf.SyncCoordinateInterval = 10 * time.Millisecond time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) - // Inject a random coordinate so we can confirm that the periodic process + // Inject a sentinel coordinate so we can confirm that the periodic process // is still able to update it. - zeroCoord := &coordinate.Coordinate{} + sentinel := coordinate.NewCoordinate(coordinate.DefaultConfig()) + sentinel.Vec[0] = 23.0 func() { req := structs.CoordinateUpdateRequest{ Datacenter: agent.config.Datacenter, Node: agent.config.NodeName, - Coord: zeroCoord, + Coord: sentinel, WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken}, } var reply struct{} @@ -861,7 +862,8 @@ func TestAgent_sendCoordinate(t *testing.T) { // to fire. time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) - // Make sure the injected coordinate is not the one that's present. + // Make sure the injected coordinate is not the one that's present since + // there should have been some more periodic updates. req = structs.NodeSpecificRequest{ Datacenter: agent.config.Datacenter, Node: agent.config.NodeName, @@ -869,7 +871,7 @@ func TestAgent_sendCoordinate(t *testing.T) { if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil { t.Fatalf("err: %v", err) } - if reflect.DeepEqual(zeroCoord, reply.Coord) { - t.Fatalf("should not have gotten the zero coordinate") + if reflect.DeepEqual(sentinel, reply.Coord) { + t.Fatalf("should not have gotten the sentinel coordinate") } } diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index b57549b65b..b09cff1579 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -30,7 +30,9 @@ func NewCoordinate(srv *Server) *Coordinate { for { select { case <-time.After(srv.config.CoordinateUpdatePeriod): - c.batchApplyUpdates() + if err := c.batchApplyUpdates(); err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err) + } case <-srv.shutdownCh: return } @@ -42,7 +44,7 @@ func NewCoordinate(srv *Server) *Coordinate { // batchApplyUpdates is a non-blocking routine that applies all pending updates // to the Raft log. -func (c *Coordinate) batchApplyUpdates() { +func (c *Coordinate) batchApplyUpdates() error { var updates []*structs.Coordinate for done := false; !done; { select { @@ -54,19 +56,25 @@ func (c *Coordinate) batchApplyUpdates() { } if len(updates) > 0 { - _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates) - if err != nil { - c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err) + if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates); err != nil { + return err } } + return nil } // Update inserts or updates the LAN coordinate of a node. -func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { +func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { return err } + // Since this is a coordinate coming from some place else we harden this + // and look for dimensionality problems proactively. + if !c.srv.serfLAN.GetCoordinate().IsCompatibleWith(args.Coord) { + return fmt.Errorf("rejected bad coordinate: %v", args.Coord) + } + // Perform a non-blocking write to the channel. We'd rather spill updates // than gum things up blocking here. update := &structs.Coordinate{Node: args.Node, Coord: args.Coord} @@ -74,7 +82,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct case c.updateCh <- update: // This is a noop - we are done if the write went through. default: - return fmt.Errorf("Coordinate update rate limit exceeded, increase SyncCoordinateInterval") + return fmt.Errorf("coordinate update rate limit exceeded, increase SyncCoordinateInterval") } return nil diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 1205346e0e..fd8e0acd00 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -140,6 +140,14 @@ func TestCoordinate_Update(t *testing.T) { t.Fatalf("should return a coordinate but it's nil") } verifyCoordinatesEqual(t, c, arg1.Coord) + + // Finally, send a coordinate with the wrong dimensionality to make sure + // there are no panics, and that it gets rejected. + arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec)) + err = client.Call("Coordinate.Update", &arg2, &out) + if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") { + t.Fatalf("should have failed with an error, got %v", err) + } } func TestCoordinate_Get(t *testing.T) {