mirror of https://github.com/status-im/consul.git
Hardens Consul from bad coordinates from other nodes.
This commit is contained in:
parent
2cee9f7e2f
commit
7e6d52109b
|
@ -841,14 +841,15 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
conf.SyncCoordinateInterval = 10 * time.Millisecond
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Inject a random coordinate so we can confirm that the periodic process
|
||||
// Inject a sentinel coordinate so we can confirm that the periodic process
|
||||
// is still able to update it.
|
||||
zeroCoord := &coordinate.Coordinate{}
|
||||
sentinel := coordinate.NewCoordinate(coordinate.DefaultConfig())
|
||||
sentinel.Vec[0] = 23.0
|
||||
func() {
|
||||
req := structs.CoordinateUpdateRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Node: agent.config.NodeName,
|
||||
Coord: zeroCoord,
|
||||
Coord: sentinel,
|
||||
WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken},
|
||||
}
|
||||
var reply struct{}
|
||||
|
@ -861,7 +862,8 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
// to fire.
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Make sure the injected coordinate is not the one that's present.
|
||||
// Make sure the injected coordinate is not the one that's present since
|
||||
// there should have been some more periodic updates.
|
||||
req = structs.NodeSpecificRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Node: agent.config.NodeName,
|
||||
|
@ -869,7 +871,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reflect.DeepEqual(zeroCoord, reply.Coord) {
|
||||
t.Fatalf("should not have gotten the zero coordinate")
|
||||
if reflect.DeepEqual(sentinel, reply.Coord) {
|
||||
t.Fatalf("should not have gotten the sentinel coordinate")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,9 @@ func NewCoordinate(srv *Server) *Coordinate {
|
|||
for {
|
||||
select {
|
||||
case <-time.After(srv.config.CoordinateUpdatePeriod):
|
||||
c.batchApplyUpdates()
|
||||
if err := c.batchApplyUpdates(); err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
|
||||
}
|
||||
case <-srv.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
@ -42,7 +44,7 @@ func NewCoordinate(srv *Server) *Coordinate {
|
|||
|
||||
// batchApplyUpdates is a non-blocking routine that applies all pending updates
|
||||
// to the Raft log.
|
||||
func (c *Coordinate) batchApplyUpdates() {
|
||||
func (c *Coordinate) batchApplyUpdates() error {
|
||||
var updates []*structs.Coordinate
|
||||
for done := false; !done; {
|
||||
select {
|
||||
|
@ -54,19 +56,25 @@ func (c *Coordinate) batchApplyUpdates() {
|
|||
}
|
||||
|
||||
if len(updates) > 0 {
|
||||
_, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
|
||||
if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update inserts or updates the LAN coordinate of a node.
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) {
|
||||
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Since this is a coordinate coming from some place else we harden this
|
||||
// and look for dimensionality problems proactively.
|
||||
if !c.srv.serfLAN.GetCoordinate().IsCompatibleWith(args.Coord) {
|
||||
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
|
||||
}
|
||||
|
||||
// Perform a non-blocking write to the channel. We'd rather spill updates
|
||||
// than gum things up blocking here.
|
||||
update := &structs.Coordinate{Node: args.Node, Coord: args.Coord}
|
||||
|
@ -74,7 +82,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
|||
case c.updateCh <- update:
|
||||
// This is a noop - we are done if the write went through.
|
||||
default:
|
||||
return fmt.Errorf("Coordinate update rate limit exceeded, increase SyncCoordinateInterval")
|
||||
return fmt.Errorf("coordinate update rate limit exceeded, increase SyncCoordinateInterval")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -140,6 +140,14 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
t.Fatalf("should return a coordinate but it's nil")
|
||||
}
|
||||
verifyCoordinatesEqual(t, c, arg1.Coord)
|
||||
|
||||
// Finally, send a coordinate with the wrong dimensionality to make sure
|
||||
// there are no panics, and that it gets rejected.
|
||||
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
|
||||
err = client.Call("Coordinate.Update", &arg2, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") {
|
||||
t.Fatalf("should have failed with an error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinate_Get(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue