mirror of https://github.com/status-im/consul.git
Merge pull request #3076 from hashicorp/reject-nan
Adds defensive checks for NaN and Inf values in network coordinates.
This commit is contained in:
commit
4109e563c2
|
@ -110,6 +110,13 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Older clients can send coordinates with invalid numeric values like
|
||||||
|
// NaN and Inf. We guard against these coming in, though newer clients
|
||||||
|
// should never send these.
|
||||||
|
if !args.Coord.IsValid() {
|
||||||
|
return fmt.Errorf("invalid coordinate")
|
||||||
|
}
|
||||||
|
|
||||||
// Since this is a coordinate coming from some place else we harden this
|
// Since this is a coordinate coming from some place else we harden this
|
||||||
// and look for dimensionality problems proactively.
|
// and look for dimensionality problems proactively.
|
||||||
coord, err := c.srv.serfLAN.GetCoordinate()
|
coord, err := c.srv.serfLAN.GetCoordinate()
|
||||||
|
@ -117,7 +124,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !coord.IsCompatibleWith(args.Coord) {
|
if !coord.IsCompatibleWith(args.Coord) {
|
||||||
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
|
return fmt.Errorf("incompatible coordinate")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
||||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -179,11 +180,19 @@ func TestCoordinate_Update(t *testing.T) {
|
||||||
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
|
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send a coordinate with a NaN to make sure that we don't absorb that
|
||||||
|
// into the database.
|
||||||
|
arg2.Coord.Vec[0] = math.NaN()
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "invalid coordinate") {
|
||||||
|
t.Fatalf("should have failed with an error, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Finally, send a coordinate with the wrong dimensionality to make sure
|
// Finally, send a coordinate with the wrong dimensionality to make sure
|
||||||
// there are no panics, and that it gets rejected.
|
// there are no panics, and that it gets rejected.
|
||||||
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
|
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
|
||||||
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
||||||
if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") {
|
if err == nil || !strings.Contains(err.Error(), "incompatible coordinate") {
|
||||||
t.Fatalf("should have failed with an error, got %v", err)
|
t.Fatalf("should have failed with an error, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,12 @@ func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
|
||||||
// already got checked on the way in during a batch update.
|
// already got checked on the way in during a batch update.
|
||||||
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
|
// Skip any bad data that may have gotten into the database from
|
||||||
|
// a bad client in the past.
|
||||||
|
if !update.Coord.IsValid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.tx.Insert("coordinates", update); err != nil {
|
if err := s.tx.Insert("coordinates", update); err != nil {
|
||||||
return fmt.Errorf("failed restoring coordinate: %s", err)
|
return fmt.Errorf("failed restoring coordinate: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -86,6 +92,12 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
|
||||||
|
|
||||||
// Upsert the coordinates.
|
// Upsert the coordinates.
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
|
// Skip any bad data that may have gotten into the database from
|
||||||
|
// a bad client in the past.
|
||||||
|
if !update.Coord.IsValid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Since the cleanup of coordinates is tied to deletion of
|
// Since the cleanup of coordinates is tied to deletion of
|
||||||
// nodes, we silently drop any updates for nodes that we don't
|
// nodes, we silently drop any updates for nodes that we don't
|
||||||
// know about. This might be possible during normal operation
|
// know about. This might be possible during normal operation
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -147,6 +148,30 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||||
t.Fatalf("bad: %#v", coord)
|
t.Fatalf("bad: %#v", coord)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply an invalid update and make sure it gets ignored.
|
||||||
|
badUpdates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: &coordinate.Coordinate{Height: math.NaN()},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(5, badUpdates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we are at the previous state, though the empty batch does bump
|
||||||
|
// the table index.
|
||||||
|
idx, coords, err = s.Coordinates(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coords, updates) {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
||||||
|
@ -220,6 +245,18 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Manually put a bad coordinate in for node3.
|
||||||
|
testRegisterNode(t, s, 4, "node3")
|
||||||
|
badUpdate := &structs.Coordinate{
|
||||||
|
Node: "node3",
|
||||||
|
Coord: &coordinate.Coordinate{Height: math.NaN()},
|
||||||
|
}
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
if err := tx.Insert("coordinates", badUpdate); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
tx.Commit()
|
||||||
|
|
||||||
// Snapshot the coordinates.
|
// Snapshot the coordinates.
|
||||||
snap := s.Snapshot()
|
snap := s.Snapshot()
|
||||||
defer snap.Close()
|
defer snap.Close()
|
||||||
|
@ -235,12 +272,12 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
Coord: generateRandomCoordinate(),
|
Coord: generateRandomCoordinate(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := s.CoordinateBatchUpdate(4, trash); err != nil {
|
if err := s.CoordinateBatchUpdate(5, trash); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the snapshot.
|
// Verify the snapshot.
|
||||||
if idx := snap.LastIndex(); idx != 3 {
|
if idx := snap.LastIndex(); idx != 4 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
iter, err := snap.Coordinates()
|
iter, err := snap.Coordinates()
|
||||||
|
@ -251,7 +288,10 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
||||||
dump = append(dump, coord.(*structs.Coordinate))
|
dump = append(dump, coord.(*structs.Coordinate))
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(dump, updates) {
|
|
||||||
|
// The snapshot will have the bad update in it, since we don't filter on
|
||||||
|
// the read side.
|
||||||
|
if !reflect.DeepEqual(dump, append(updates, badUpdate)) {
|
||||||
t.Fatalf("bad: %#v", dump)
|
t.Fatalf("bad: %#v", dump)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +299,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
func() {
|
func() {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
restore := s.Restore()
|
restore := s.Restore()
|
||||||
if err := restore.Coordinates(5, dump); err != nil {
|
if err := restore.Coordinates(6, dump); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
restore.Commit()
|
restore.Commit()
|
||||||
|
@ -269,7 +309,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx != 5 {
|
if idx != 6 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(res, updates) {
|
if !reflect.DeepEqual(res, updates) {
|
||||||
|
@ -278,7 +318,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
|
|
||||||
// Check that the index was updated (note that it got passed
|
// Check that the index was updated (note that it got passed
|
||||||
// in during the restore).
|
// in during the restore).
|
||||||
if idx := s.maxIndex("coordinates"); idx != 5 {
|
if idx := s.maxIndex("coordinates"); idx != 6 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -34,10 +34,20 @@ type Client struct {
|
||||||
// value to determine how many samples we keep, per node.
|
// value to determine how many samples we keep, per node.
|
||||||
latencyFilterSamples map[string][]float64
|
latencyFilterSamples map[string][]float64
|
||||||
|
|
||||||
|
// stats is used to record events that occur when updating coordinates.
|
||||||
|
stats ClientStats
|
||||||
|
|
||||||
// mutex enables safe concurrent access to the client.
|
// mutex enables safe concurrent access to the client.
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClientStats is used to record events that occur when updating coordinates.
|
||||||
|
type ClientStats struct {
|
||||||
|
// Resets is incremented any time we reset our local coordinate because
|
||||||
|
// our calculations have resulted in an invalid state.
|
||||||
|
Resets int
|
||||||
|
}
|
||||||
|
|
||||||
// NewClient creates a new Client and verifies the configuration is valid.
|
// NewClient creates a new Client and verifies the configuration is valid.
|
||||||
func NewClient(config *Config) (*Client, error) {
|
func NewClient(config *Config) (*Client, error) {
|
||||||
if !(config.Dimensionality > 0) {
|
if !(config.Dimensionality > 0) {
|
||||||
|
@ -63,11 +73,16 @@ func (c *Client) GetCoordinate() *Coordinate {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCoordinate forces the client's coordinate to a known state.
|
// SetCoordinate forces the client's coordinate to a known state.
|
||||||
func (c *Client) SetCoordinate(coord *Coordinate) {
|
func (c *Client) SetCoordinate(coord *Coordinate) error {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
defer c.mutex.Unlock()
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
if err := c.checkCoordinate(coord); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
c.coord = coord.Clone()
|
c.coord = coord.Clone()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForgetNode removes any client state for the given node.
|
// ForgetNode removes any client state for the given node.
|
||||||
|
@ -78,6 +93,29 @@ func (c *Client) ForgetNode(node string) {
|
||||||
delete(c.latencyFilterSamples, node)
|
delete(c.latencyFilterSamples, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stats returns a copy of stats for the client.
|
||||||
|
func (c *Client) Stats() ClientStats {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
return c.stats
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkCoordinate returns an error if the coordinate isn't compatible with
|
||||||
|
// this client, or if the coordinate itself isn't valid. This assumes the mutex
|
||||||
|
// has been locked already.
|
||||||
|
func (c *Client) checkCoordinate(coord *Coordinate) error {
|
||||||
|
if !c.coord.IsCompatibleWith(coord) {
|
||||||
|
return fmt.Errorf("dimensions aren't compatible")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !coord.IsValid() {
|
||||||
|
return fmt.Errorf("coordinate is invalid")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// latencyFilter applies a simple moving median filter with a new sample for
|
// latencyFilter applies a simple moving median filter with a new sample for
|
||||||
// a node. This assumes that the mutex has been locked already.
|
// a node. This assumes that the mutex has been locked already.
|
||||||
func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
|
func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
|
||||||
|
@ -159,15 +197,24 @@ func (c *Client) updateGravity() {
|
||||||
// Update takes other, a coordinate for another node, and rtt, a round trip
|
// Update takes other, a coordinate for another node, and rtt, a round trip
|
||||||
// time observation for a ping to that node, and updates the estimated position of
|
// time observation for a ping to that node, and updates the estimated position of
|
||||||
// the client's coordinate. Returns the updated coordinate.
|
// the client's coordinate. Returns the updated coordinate.
|
||||||
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
|
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
defer c.mutex.Unlock()
|
defer c.mutex.Unlock()
|
||||||
|
|
||||||
|
if err := c.checkCoordinate(other); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
rttSeconds := c.latencyFilter(node, rtt.Seconds())
|
rttSeconds := c.latencyFilter(node, rtt.Seconds())
|
||||||
c.updateVivaldi(other, rttSeconds)
|
c.updateVivaldi(other, rttSeconds)
|
||||||
c.updateAdjustment(other, rttSeconds)
|
c.updateAdjustment(other, rttSeconds)
|
||||||
c.updateGravity()
|
c.updateGravity()
|
||||||
return c.coord.Clone()
|
if !c.coord.IsValid() {
|
||||||
|
c.stats.Resets++
|
||||||
|
c.coord = NewCoordinate(c.config)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.coord.Clone(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DistanceTo returns the estimated RTT from the client's coordinate to other, the
|
// DistanceTo returns the estimated RTT from the client's coordinate to other, the
|
||||||
|
|
|
@ -72,6 +72,26 @@ func (c *Coordinate) Clone() *Coordinate {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// componentIsValid returns false if a floating point value is a NaN or an
|
||||||
|
// infinity.
|
||||||
|
func componentIsValid(f float64) bool {
|
||||||
|
return !math.IsInf(f, 0) && !math.IsNaN(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsValid returns false if any component of a coordinate isn't valid, per the
|
||||||
|
// componentIsValid() helper above.
|
||||||
|
func (c *Coordinate) IsValid() bool {
|
||||||
|
for i := range c.Vec {
|
||||||
|
if !componentIsValid(c.Vec[i]) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return componentIsValid(c.Error) &&
|
||||||
|
componentIsValid(c.Adjustment) &&
|
||||||
|
componentIsValid(c.Height)
|
||||||
|
}
|
||||||
|
|
||||||
// IsCompatibleWith checks to see if the two coordinates are compatible
|
// IsCompatibleWith checks to see if the two coordinates are compatible
|
||||||
// dimensionally. If this returns true then you are guaranteed to not get
|
// dimensionally. If this returns true then you are guaranteed to not get
|
||||||
// any runtime errors operating on them.
|
// any runtime errors operating on them.
|
||||||
|
@ -122,7 +142,7 @@ func (c *Coordinate) rawDistanceTo(other *Coordinate) float64 {
|
||||||
// already been checked to be compatible.
|
// already been checked to be compatible.
|
||||||
func add(vec1 []float64, vec2 []float64) []float64 {
|
func add(vec1 []float64, vec2 []float64) []float64 {
|
||||||
ret := make([]float64, len(vec1))
|
ret := make([]float64, len(vec1))
|
||||||
for i, _ := range ret {
|
for i := range ret {
|
||||||
ret[i] = vec1[i] + vec2[i]
|
ret[i] = vec1[i] + vec2[i]
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
|
@ -132,7 +152,7 @@ func add(vec1 []float64, vec2 []float64) []float64 {
|
||||||
// dimensions have already been checked to be compatible.
|
// dimensions have already been checked to be compatible.
|
||||||
func diff(vec1 []float64, vec2 []float64) []float64 {
|
func diff(vec1 []float64, vec2 []float64) []float64 {
|
||||||
ret := make([]float64, len(vec1))
|
ret := make([]float64, len(vec1))
|
||||||
for i, _ := range ret {
|
for i := range ret {
|
||||||
ret[i] = vec1[i] - vec2[i]
|
ret[i] = vec1[i] - vec2[i]
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
|
@ -141,7 +161,7 @@ func diff(vec1 []float64, vec2 []float64) []float64 {
|
||||||
// mul returns vec multiplied by a scalar factor.
|
// mul returns vec multiplied by a scalar factor.
|
||||||
func mul(vec []float64, factor float64) []float64 {
|
func mul(vec []float64, factor float64) []float64 {
|
||||||
ret := make([]float64, len(vec))
|
ret := make([]float64, len(vec))
|
||||||
for i, _ := range vec {
|
for i := range vec {
|
||||||
ret[i] = vec[i] * factor
|
ret[i] = vec[i] * factor
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
|
@ -150,7 +170,7 @@ func mul(vec []float64, factor float64) []float64 {
|
||||||
// magnitude computes the magnitude of the vec.
|
// magnitude computes the magnitude of the vec.
|
||||||
func magnitude(vec []float64) float64 {
|
func magnitude(vec []float64) float64 {
|
||||||
sum := 0.0
|
sum := 0.0
|
||||||
for i, _ := range vec {
|
for i := range vec {
|
||||||
sum += vec[i] * vec[i]
|
sum += vec[i] * vec[i]
|
||||||
}
|
}
|
||||||
return math.Sqrt(sum)
|
return math.Sqrt(sum)
|
||||||
|
@ -168,7 +188,7 @@ func unitVectorAt(vec1 []float64, vec2 []float64) ([]float64, float64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, just return a random unit vector.
|
// Otherwise, just return a random unit vector.
|
||||||
for i, _ := range ret {
|
for i := range ret {
|
||||||
ret[i] = rand.Float64() - 0.5
|
ret[i] = rand.Float64() - 0.5
|
||||||
}
|
}
|
||||||
if mag := magnitude(ret); mag > zeroThreshold {
|
if mag := magnitude(ret); mag > zeroThreshold {
|
||||||
|
|
|
@ -2,7 +2,6 @@ package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte {
|
||||||
// The rest of the message is the serialized coordinate.
|
// The rest of the message is the serialized coordinate.
|
||||||
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
||||||
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
|
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
|
||||||
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
|
p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
|
||||||
}
|
}
|
||||||
return buf.Bytes()
|
return buf.Bytes()
|
||||||
}
|
}
|
||||||
|
@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
|
||||||
// Verify ping version in the header.
|
// Verify ping version in the header.
|
||||||
version := payload[0]
|
version := payload[0]
|
||||||
if version != PingVersion {
|
if version != PingVersion {
|
||||||
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
|
p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,14 +60,18 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
|
||||||
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
|
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
|
||||||
var coord coordinate.Coordinate
|
var coord coordinate.Coordinate
|
||||||
if err := dec.Decode(&coord); err != nil {
|
if err := dec.Decode(&coord); err != nil {
|
||||||
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
|
p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply the update. Since this is a coordinate coming from some place
|
// Apply the update.
|
||||||
// else we harden this and look for dimensionality problems proactively.
|
|
||||||
before := p.serf.coordClient.GetCoordinate()
|
before := p.serf.coordClient.GetCoordinate()
|
||||||
if before.IsCompatibleWith(&coord) {
|
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
|
||||||
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
|
if err != nil {
|
||||||
|
p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
|
||||||
|
other.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Publish some metrics to give us an idea of how much we are
|
// Publish some metrics to give us an idea of how much we are
|
||||||
// adjusting each time we update.
|
// adjusting each time we update.
|
||||||
|
@ -83,7 +86,4 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
|
||||||
p.serf.coordCache[other.Name] = &coord
|
p.serf.coordCache[other.Name] = &coord
|
||||||
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
|
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
|
||||||
p.serf.coordCacheLock.Unlock()
|
p.serf.coordCacheLock.Unlock()
|
||||||
} else {
|
|
||||||
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package serf
|
package serf
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -148,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time {
|
||||||
|
|
||||||
// Finished returns if the query is finished running
|
// Finished returns if the query is finished running
|
||||||
func (r *QueryResponse) Finished() bool {
|
func (r *QueryResponse) Finished() bool {
|
||||||
|
r.closeLock.Lock()
|
||||||
|
defer r.closeLock.Unlock()
|
||||||
return r.closed || time.Now().After(r.deadline)
|
return r.closed || time.Now().After(r.deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
|
||||||
return r.respCh
|
return r.respCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendResponse sends a response on the response channel ensuring the channel is not closed.
|
||||||
|
func (r *QueryResponse) sendResponse(nr NodeResponse) error {
|
||||||
|
r.closeLock.Lock()
|
||||||
|
defer r.closeLock.Unlock()
|
||||||
|
if r.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case r.respCh <- nr:
|
||||||
|
r.responses[nr.From] = struct{}{}
|
||||||
|
default:
|
||||||
|
return errors.New("serf: Failed to deliver query response, dropping")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NodeResponse is used to represent a single response from a node
|
// NodeResponse is used to represent a single response from a node
|
||||||
type NodeResponse struct {
|
type NodeResponse struct {
|
||||||
From string
|
From string
|
||||||
|
|
|
@ -241,18 +241,13 @@ func Create(conf *Config) (*Serf, error) {
|
||||||
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.LogOutput != nil && conf.Logger != nil {
|
|
||||||
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
|
|
||||||
}
|
|
||||||
|
|
||||||
logDest := conf.LogOutput
|
|
||||||
if logDest == nil {
|
|
||||||
logDest = os.Stderr
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := conf.Logger
|
logger := conf.Logger
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = log.New(logDest, "", log.LstdFlags)
|
logOutput := conf.LogOutput
|
||||||
|
if logOutput == nil {
|
||||||
|
logOutput = os.Stderr
|
||||||
|
}
|
||||||
|
logger = log.New(logOutput, "", log.LstdFlags)
|
||||||
}
|
}
|
||||||
|
|
||||||
serf := &Serf{
|
serf := &Serf{
|
||||||
|
@ -343,21 +338,15 @@ func Create(conf *Config) (*Serf, error) {
|
||||||
// Setup the various broadcast queues, which we use to send our own
|
// Setup the various broadcast queues, which we use to send our own
|
||||||
// custom broadcasts along the gossip channel.
|
// custom broadcasts along the gossip channel.
|
||||||
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: serf.NumNodes,
|
||||||
return len(serf.members)
|
|
||||||
},
|
|
||||||
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -807,13 +796,15 @@ func (s *Serf) Shutdown() error {
|
||||||
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait to close the shutdown channel until after we've shut down the
|
||||||
|
// memberlist and its associated network resources, since the shutdown
|
||||||
|
// channel signals that we are cleaned up outside of Serf.
|
||||||
s.state = SerfShutdown
|
s.state = SerfShutdown
|
||||||
close(s.shutdownCh)
|
|
||||||
|
|
||||||
err := s.memberlist.Shutdown()
|
err := s.memberlist.Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
close(s.shutdownCh)
|
||||||
|
|
||||||
// Wait for the snapshoter to finish if we have one
|
// Wait for the snapshoter to finish if we have one
|
||||||
if s.snapshotter != nil {
|
if s.snapshotter != nil {
|
||||||
|
@ -1323,11 +1314,9 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
||||||
select {
|
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
|
||||||
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
|
if err != nil {
|
||||||
query.responses[resp.From] = struct{}{}
|
s.logger.Printf("[WARN] %v", err)
|
||||||
default:
|
|
||||||
s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1387,7 +1376,7 @@ func (s *Serf) resolveNodeConflict() {
|
||||||
|
|
||||||
// Update the counters
|
// Update the counters
|
||||||
responses++
|
responses++
|
||||||
if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
|
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
|
||||||
matching++
|
matching++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1675,6 +1664,7 @@ func (s *Serf) Stats() map[string]string {
|
||||||
"event_queue": toString(uint64(s.eventBroadcasts.NumQueued())),
|
"event_queue": toString(uint64(s.eventBroadcasts.NumQueued())),
|
||||||
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
||||||
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
||||||
|
"coordinate_resets": toString(uint64(s.coordClient.Stats().Resets)),
|
||||||
}
|
}
|
||||||
return stats
|
return stats
|
||||||
}
|
}
|
||||||
|
|
|
@ -532,7 +532,10 @@ func (s *Snapshotter) replay() error {
|
||||||
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
|
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.coordClient.SetCoordinate(&coord)
|
if err := s.coordClient.SetCoordinate(&coord); err != nil {
|
||||||
|
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
} else if line == "leave" {
|
} else if line == "leave" {
|
||||||
// Ignore a leave if we plan on re-joining
|
// Ignore a leave if we plan on re-joining
|
||||||
if s.rejoinAfterLeave {
|
if s.rejoinAfterLeave {
|
||||||
|
|
|
@ -668,18 +668,18 @@
|
||||||
"revisionTime": "2015-02-01T20:08:39Z"
|
"revisionTime": "2015-02-01T20:08:39Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=",
|
"checksumSHA1": "/oss17GO4hXGM7QnUdI3VzcAHzA=",
|
||||||
"comment": "v0.7.0-66-g6c4672d",
|
"comment": "v0.7.0-66-g6c4672d",
|
||||||
"path": "github.com/hashicorp/serf/coordinate",
|
"path": "github.com/hashicorp/serf/coordinate",
|
||||||
"revision": "114430d8210835d66defdc31cdc176c58e060005",
|
"revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a",
|
||||||
"revisionTime": "2016-08-09T01:42:04Z"
|
"revisionTime": "2017-05-25T23:15:04Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "AZ4RoXStVz6qx38ZMZAyC6Gw3Q4=",
|
"checksumSHA1": "ZkJRgexeNzNZzpw6YnedwoJl7pE=",
|
||||||
"comment": "v0.7.0-66-g6c4672d",
|
"comment": "v0.7.0-66-g6c4672d",
|
||||||
"path": "github.com/hashicorp/serf/serf",
|
"path": "github.com/hashicorp/serf/serf",
|
||||||
"revision": "c5e26c3704ca774760df65ee8cbb039d9d9ec560",
|
"revision": "dfab144618a063232d5753eaa4250a09865106c5",
|
||||||
"revisionTime": "2017-02-08T21:49:39Z"
|
"revisionTime": "2017-05-26T05:01:28Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",
|
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",
|
||||||
|
|
Loading…
Reference in New Issue