From 9aba84eb1ee73203bf0ebffb813e8caaed75e2a7 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 25 May 2017 16:16:37 -0700 Subject: [PATCH 1/3] Updates Serf to pick up small fixes and coordinate NaN/Inf defenses. --- .../hashicorp/serf/coordinate/client.go | 53 ++++++++++++++- .../hashicorp/serf/coordinate/coordinate.go | 30 +++++++-- .../hashicorp/serf/serf/ping_delegate.go | 41 ++++++------ .../github.com/hashicorp/serf/serf/query.go | 19 ++++++ vendor/github.com/hashicorp/serf/serf/serf.go | 66 ++++++++----------- .../hashicorp/serf/serf/snapshot.go | 5 +- vendor/vendor.json | 12 ++-- 7 files changed, 153 insertions(+), 73 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/coordinate/client.go b/vendor/github.com/hashicorp/serf/coordinate/client.go index 613bfff89e..63f6241411 100644 --- a/vendor/github.com/hashicorp/serf/coordinate/client.go +++ b/vendor/github.com/hashicorp/serf/coordinate/client.go @@ -34,10 +34,20 @@ type Client struct { // value to determine how many samples we keep, per node. latencyFilterSamples map[string][]float64 + // stats is used to record events that occur when updating coordinates. + stats ClientStats + // mutex enables safe concurrent access to the client. mutex sync.RWMutex } +// ClientStats is used to record events that occur when updating coordinates. +type ClientStats struct { + // Resets is incremented any time we reset our local coordinate because + // our calculations have resulted in an invalid state. + Resets int +} + // NewClient creates a new Client and verifies the configuration is valid. func NewClient(config *Config) (*Client, error) { if !(config.Dimensionality > 0) { @@ -63,11 +73,16 @@ func (c *Client) GetCoordinate() *Coordinate { } // SetCoordinate forces the client's coordinate to a known state. -func (c *Client) SetCoordinate(coord *Coordinate) { +func (c *Client) SetCoordinate(coord *Coordinate) error { c.mutex.Lock() defer c.mutex.Unlock() + if err := c.checkCoordinate(coord); err != nil { + return err + } + c.coord = coord.Clone() + return nil } // ForgetNode removes any client state for the given node. @@ -78,6 +93,29 @@ func (c *Client) ForgetNode(node string) { delete(c.latencyFilterSamples, node) } +// Stats returns a copy of stats for the client. +func (c *Client) Stats() ClientStats { + c.mutex.Lock() + defer c.mutex.Unlock() + + return c.stats +} + +// checkCoordinate returns an error if the coordinate isn't compatible with +// this client, or if the coordinate itself isn't valid. This assumes the mutex +// has been locked already. +func (c *Client) checkCoordinate(coord *Coordinate) error { + if !c.coord.IsCompatibleWith(coord) { + return fmt.Errorf("dimensions aren't compatible") + } + + if !coord.IsValid() { + return fmt.Errorf("coordinate is invalid") + } + + return nil +} + // latencyFilter applies a simple moving median filter with a new sample for // a node. This assumes that the mutex has been locked already. func (c *Client) latencyFilter(node string, rttSeconds float64) float64 { @@ -159,15 +197,24 @@ func (c *Client) updateGravity() { // Update takes other, a coordinate for another node, and rtt, a round trip // time observation for a ping to that node, and updates the estimated position of // the client's coordinate. Returns the updated coordinate. -func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate { +func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) { c.mutex.Lock() defer c.mutex.Unlock() + if err := c.checkCoordinate(other); err != nil { + return nil, err + } + rttSeconds := c.latencyFilter(node, rtt.Seconds()) c.updateVivaldi(other, rttSeconds) c.updateAdjustment(other, rttSeconds) c.updateGravity() - return c.coord.Clone() + if !c.coord.IsValid() { + c.stats.Resets++ + c.coord = NewCoordinate(c.config) + } + + return c.coord.Clone(), nil } // DistanceTo returns the estimated RTT from the client's coordinate to other, the diff --git a/vendor/github.com/hashicorp/serf/coordinate/coordinate.go b/vendor/github.com/hashicorp/serf/coordinate/coordinate.go index c9194e048b..fbe792c90d 100644 --- a/vendor/github.com/hashicorp/serf/coordinate/coordinate.go +++ b/vendor/github.com/hashicorp/serf/coordinate/coordinate.go @@ -72,6 +72,26 @@ func (c *Coordinate) Clone() *Coordinate { } } +// componentIsValid returns false if a floating point value is a NaN or an +// infinity. +func componentIsValid(f float64) bool { + return !math.IsInf(f, 0) && !math.IsNaN(f) +} + +// IsValid returns false if any component of a coordinate isn't valid, per the +// componentIsValid() helper above. +func (c *Coordinate) IsValid() bool { + for i := range c.Vec { + if !componentIsValid(c.Vec[i]) { + return false + } + } + + return componentIsValid(c.Error) && + componentIsValid(c.Adjustment) && + componentIsValid(c.Height) +} + // IsCompatibleWith checks to see if the two coordinates are compatible // dimensionally. If this returns true then you are guaranteed to not get // any runtime errors operating on them. @@ -122,7 +142,7 @@ func (c *Coordinate) rawDistanceTo(other *Coordinate) float64 { // already been checked to be compatible. func add(vec1 []float64, vec2 []float64) []float64 { ret := make([]float64, len(vec1)) - for i, _ := range ret { + for i := range ret { ret[i] = vec1[i] + vec2[i] } return ret @@ -132,7 +152,7 @@ func add(vec1 []float64, vec2 []float64) []float64 { // dimensions have already been checked to be compatible. func diff(vec1 []float64, vec2 []float64) []float64 { ret := make([]float64, len(vec1)) - for i, _ := range ret { + for i := range ret { ret[i] = vec1[i] - vec2[i] } return ret @@ -141,7 +161,7 @@ func diff(vec1 []float64, vec2 []float64) []float64 { // mul returns vec multiplied by a scalar factor. func mul(vec []float64, factor float64) []float64 { ret := make([]float64, len(vec)) - for i, _ := range vec { + for i := range vec { ret[i] = vec[i] * factor } return ret @@ -150,7 +170,7 @@ func mul(vec []float64, factor float64) []float64 { // magnitude computes the magnitude of the vec. func magnitude(vec []float64) float64 { sum := 0.0 - for i, _ := range vec { + for i := range vec { sum += vec[i] * vec[i] } return math.Sqrt(sum) @@ -168,7 +188,7 @@ func unitVectorAt(vec1 []float64, vec2 []float64) ([]float64, float64) { } // Otherwise, just return a random unit vector. - for i, _ := range ret { + for i := range ret { ret[i] = rand.Float64() - 0.5 } if mag := magnitude(ret); mag > zeroThreshold { diff --git a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go index a482685a20..f5d991b560 100644 --- a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go @@ -62,28 +62,29 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat var coord coordinate.Coordinate if err := dec.Decode(&coord); err != nil { log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) + return } - // Apply the update. Since this is a coordinate coming from some place - // else we harden this and look for dimensionality problems proactively. + // Apply the update. before := p.serf.coordClient.GetCoordinate() - if before.IsCompatibleWith(&coord) { - after := p.serf.coordClient.Update(other.Name, &coord, rtt) - - // Publish some metrics to give us an idea of how much we are - // adjusting each time we update. - d := float32(before.DistanceTo(after).Seconds() * 1.0e3) - metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) - - // Cache the coordinate for the other node, and add our own - // to the cache as well since it just got updated. This lets - // users call GetCachedCoordinate with our node name, which is - // more friendly. - p.serf.coordCacheLock.Lock() - p.serf.coordCache[other.Name] = &coord - p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate() - p.serf.coordCacheLock.Unlock() - } else { - log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord) + after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) + if err != nil { + log.Printf("[ERR] serf: Rejected coordinate from %s: %v\n", + other.Name, err) + return } + + // Publish some metrics to give us an idea of how much we are + // adjusting each time we update. + d := float32(before.DistanceTo(after).Seconds() * 1.0e3) + metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) + + // Cache the coordinate for the other node, and add our own + // to the cache as well since it just got updated. This lets + // users call GetCachedCoordinate with our node name, which is + // more friendly. + p.serf.coordCacheLock.Lock() + p.serf.coordCache[other.Name] = &coord + p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate() + p.serf.coordCacheLock.Unlock() } diff --git a/vendor/github.com/hashicorp/serf/serf/query.go b/vendor/github.com/hashicorp/serf/serf/query.go index 5412821e30..0bdbb35538 100644 --- a/vendor/github.com/hashicorp/serf/serf/query.go +++ b/vendor/github.com/hashicorp/serf/serf/query.go @@ -1,6 +1,7 @@ package serf import ( + "errors" "fmt" "math" "math/rand" @@ -148,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time { // Finished returns if the query is finished running func (r *QueryResponse) Finished() bool { + r.closeLock.Lock() + defer r.closeLock.Unlock() return r.closed || time.Now().After(r.deadline) } @@ -164,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse { return r.respCh } +// sendResponse sends a response on the response channel ensuring the channel is not closed. +func (r *QueryResponse) sendResponse(nr NodeResponse) error { + r.closeLock.Lock() + defer r.closeLock.Unlock() + if r.closed { + return nil + } + select { + case r.respCh <- nr: + r.responses[nr.From] = struct{}{} + default: + return errors.New("serf: Failed to deliver query response, dropping") + } + return nil +} + // NodeResponse is used to represent a single response from a node type NodeResponse struct { From string diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index c7e603b243..3a88d9b0d9 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -241,18 +241,13 @@ func Create(conf *Config) (*Serf, error) { conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) } - if conf.LogOutput != nil && conf.Logger != nil { - return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") - } - - logDest := conf.LogOutput - if logDest == nil { - logDest = os.Stderr - } - logger := conf.Logger if logger == nil { - logger = log.New(logDest, "", log.LstdFlags) + logOutput := conf.LogOutput + if logOutput == nil { + logOutput = os.Stderr + } + logger = log.New(logOutput, "", log.LstdFlags) } serf := &Serf{ @@ -343,21 +338,15 @@ func Create(conf *Config) (*Serf, error) { // Setup the various broadcast queues, which we use to send our own // custom broadcasts along the gossip channel. serf.broadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(serf.members) - }, + NumNodes: serf.NumNodes, RetransmitMult: conf.MemberlistConfig.RetransmitMult, } @@ -807,13 +796,15 @@ func (s *Serf) Shutdown() error { s.logger.Printf("[WARN] serf: Shutdown without a Leave") } + // Wait to close the shutdown channel until after we've shut down the + // memberlist and its associated network resources, since the shutdown + // channel signals that we are cleaned up outside of Serf. s.state = SerfShutdown - close(s.shutdownCh) - err := s.memberlist.Shutdown() if err != nil { return err } + close(s.shutdownCh) // Wait for the snapshoter to finish if we have one if s.snapshotter != nil { @@ -1323,11 +1314,9 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { } metrics.IncrCounter([]string{"serf", "query_responses"}, 1) - select { - case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: - query.responses[resp.From] = struct{}{} - default: - s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping") + err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload}) + if err != nil { + s.logger.Printf("[WARN] %v", err) } } } @@ -1387,7 +1376,7 @@ func (s *Serf) resolveNodeConflict() { // Update the counters responses++ - if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port { + if member.Addr.Equal(local.Addr) && member.Port == local.Port { matching++ } } @@ -1664,17 +1653,18 @@ func (s *Serf) Stats() map[string]string { return strconv.FormatUint(v, 10) } stats := map[string]string{ - "members": toString(uint64(len(s.members))), - "failed": toString(uint64(len(s.failedMembers))), - "left": toString(uint64(len(s.leftMembers))), - "health_score": toString(uint64(s.memberlist.GetHealthScore())), - "member_time": toString(uint64(s.clock.Time())), - "event_time": toString(uint64(s.eventClock.Time())), - "query_time": toString(uint64(s.queryClock.Time())), - "intent_queue": toString(uint64(s.broadcasts.NumQueued())), - "event_queue": toString(uint64(s.eventBroadcasts.NumQueued())), - "query_queue": toString(uint64(s.queryBroadcasts.NumQueued())), - "encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()), + "members": toString(uint64(len(s.members))), + "failed": toString(uint64(len(s.failedMembers))), + "left": toString(uint64(len(s.leftMembers))), + "health_score": toString(uint64(s.memberlist.GetHealthScore())), + "member_time": toString(uint64(s.clock.Time())), + "event_time": toString(uint64(s.eventClock.Time())), + "query_time": toString(uint64(s.queryClock.Time())), + "intent_queue": toString(uint64(s.broadcasts.NumQueued())), + "event_queue": toString(uint64(s.eventBroadcasts.NumQueued())), + "query_queue": toString(uint64(s.queryBroadcasts.NumQueued())), + "encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()), + "coordinate_resets": toString(uint64(s.coordClient.Stats().Resets)), } return stats } diff --git a/vendor/github.com/hashicorp/serf/serf/snapshot.go b/vendor/github.com/hashicorp/serf/serf/snapshot.go index 6e1fbd596c..a27fee5e87 100644 --- a/vendor/github.com/hashicorp/serf/serf/snapshot.go +++ b/vendor/github.com/hashicorp/serf/serf/snapshot.go @@ -532,7 +532,10 @@ func (s *Snapshotter) replay() error { s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err) continue } - s.coordClient.SetCoordinate(&coord) + if err := s.coordClient.SetCoordinate(&coord); err != nil { + s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err) + continue + } } else if line == "leave" { // Ignore a leave if we plan on re-joining if s.rejoinAfterLeave { diff --git a/vendor/vendor.json b/vendor/vendor.json index 9afb6cf91f..a1d6b1e3f7 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -668,18 +668,18 @@ "revisionTime": "2015-02-01T20:08:39Z" }, { - "checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=", + "checksumSHA1": "/oss17GO4hXGM7QnUdI3VzcAHzA=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/coordinate", - "revision": "114430d8210835d66defdc31cdc176c58e060005", - "revisionTime": "2016-08-09T01:42:04Z" + "revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a", + "revisionTime": "2017-05-25T23:15:04Z" }, { - "checksumSHA1": "AZ4RoXStVz6qx38ZMZAyC6Gw3Q4=", + "checksumSHA1": "cOk2eJmnkqSyA0utcLlzWMFDwXg=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/serf", - "revision": "c5e26c3704ca774760df65ee8cbb039d9d9ec560", - "revisionTime": "2017-02-08T21:49:39Z" + "revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a", + "revisionTime": "2017-05-25T23:15:04Z" }, { "checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=", From 7a5626230ed2da2d4ebcda0b48933b15d07a7bb9 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 25 May 2017 17:37:16 -0700 Subject: [PATCH 2/3] Prevents bad coordinates and cleans them up in the database. --- consul/coordinate_endpoint.go | 9 +++++- consul/coordinate_endpoint_test.go | 11 ++++++- consul/state/coordinate.go | 12 +++++++ consul/state/coordinate_test.go | 52 ++++++++++++++++++++++++++---- 4 files changed, 76 insertions(+), 8 deletions(-) diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index c297c1088c..e6979cfdcb 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -110,6 +110,13 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct return err } + // Older clients can send coordinates with invalid numeric values like + // NaN and Inf. We guard against these coming in, though newer clients + // should never send these. + if !args.Coord.IsValid() { + return fmt.Errorf("invalid coordinate") + } + // Since this is a coordinate coming from some place else we harden this // and look for dimensionality problems proactively. coord, err := c.srv.serfLAN.GetCoordinate() @@ -117,7 +124,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct return err } if !coord.IsCompatibleWith(args.Coord) { - return fmt.Errorf("rejected bad coordinate: %v", args.Coord) + return fmt.Errorf("incompatible coordinate") } // Fetch the ACL token, if any, and enforce the node policy if enabled. diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index ec88916fcb..3158ceba91 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "math" "math/rand" "os" "reflect" @@ -179,11 +180,19 @@ func TestCoordinate_Update(t *testing.T) { t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped) } + // Send a coordinate with a NaN to make sure that we don't absorb that + // into the database. + arg2.Coord.Vec[0] = math.NaN() + err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out) + if err == nil || !strings.Contains(err.Error(), "invalid coordinate") { + t.Fatalf("should have failed with an error, got %v", err) + } + // Finally, send a coordinate with the wrong dimensionality to make sure // there are no panics, and that it gets rejected. arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec)) err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out) - if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") { + if err == nil || !strings.Contains(err.Error(), "incompatible coordinate") { t.Fatalf("should have failed with an error, got %v", err) } } diff --git a/consul/state/coordinate.go b/consul/state/coordinate.go index 2829053f2f..eaa606d27d 100644 --- a/consul/state/coordinate.go +++ b/consul/state/coordinate.go @@ -22,6 +22,12 @@ func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) { // already got checked on the way in during a batch update. func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { for _, update := range updates { + // Skip any bad data that may have gotten into the database from + // a bad client in the past. + if !update.Coord.IsValid() { + continue + } + if err := s.tx.Insert("coordinates", update); err != nil { return fmt.Errorf("failed restoring coordinate: %s", err) } @@ -86,6 +92,12 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e // Upsert the coordinates. for _, update := range updates { + // Skip any bad data that may have gotten into the database from + // a bad client in the past. + if !update.Coord.IsValid() { + continue + } + // Since the cleanup of coordinates is tied to deletion of // nodes, we silently drop any updates for nodes that we don't // know about. This might be possible during normal operation diff --git a/consul/state/coordinate_test.go b/consul/state/coordinate_test.go index c8af3a9f41..defbc092f5 100644 --- a/consul/state/coordinate_test.go +++ b/consul/state/coordinate_test.go @@ -1,6 +1,7 @@ package state import ( + "math" "math/rand" "reflect" "testing" @@ -147,6 +148,30 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { t.Fatalf("bad: %#v", coord) } } + + // Apply an invalid update and make sure it gets ignored. + badUpdates := structs.Coordinates{ + &structs.Coordinate{ + Node: "node1", + Coord: &coordinate.Coordinate{Height: math.NaN()}, + }, + } + if err := s.CoordinateBatchUpdate(5, badUpdates); err != nil { + t.Fatalf("err: %s", err) + } + + // Verify we are at the previous state, though the empty batch does bump + // the table index. + idx, coords, err = s.Coordinates(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + if !reflect.DeepEqual(coords, updates) { + t.Fatalf("bad: %#v", coords) + } } func TestStateStore_Coordinate_Cleanup(t *testing.T) { @@ -220,6 +245,18 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { t.Fatalf("err: %s", err) } + // Manually put a bad coordinate in for node3. + testRegisterNode(t, s, 4, "node3") + badUpdate := &structs.Coordinate{ + Node: "node3", + Coord: &coordinate.Coordinate{Height: math.NaN()}, + } + tx := s.db.Txn(true) + if err := tx.Insert("coordinates", badUpdate); err != nil { + t.Fatalf("err: %v", err) + } + tx.Commit() + // Snapshot the coordinates. snap := s.Snapshot() defer snap.Close() @@ -235,12 +272,12 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { Coord: generateRandomCoordinate(), }, } - if err := s.CoordinateBatchUpdate(4, trash); err != nil { + if err := s.CoordinateBatchUpdate(5, trash); err != nil { t.Fatalf("err: %s", err) } // Verify the snapshot. - if idx := snap.LastIndex(); idx != 3 { + if idx := snap.LastIndex(); idx != 4 { t.Fatalf("bad index: %d", idx) } iter, err := snap.Coordinates() @@ -251,7 +288,10 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { for coord := iter.Next(); coord != nil; coord = iter.Next() { dump = append(dump, coord.(*structs.Coordinate)) } - if !reflect.DeepEqual(dump, updates) { + + // The snapshot will have the bad update in it, since we don't filter on + // the read side. + if !reflect.DeepEqual(dump, append(updates, badUpdate)) { t.Fatalf("bad: %#v", dump) } @@ -259,7 +299,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { func() { s := testStateStore(t) restore := s.Restore() - if err := restore.Coordinates(5, dump); err != nil { + if err := restore.Coordinates(6, dump); err != nil { t.Fatalf("err: %s", err) } restore.Commit() @@ -269,7 +309,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - if idx != 5 { + if idx != 6 { t.Fatalf("bad index: %d", idx) } if !reflect.DeepEqual(res, updates) { @@ -278,7 +318,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { // Check that the index was updated (note that it got passed // in during the restore). - if idx := s.maxIndex("coordinates"); idx != 5 { + if idx := s.maxIndex("coordinates"); idx != 6 { t.Fatalf("bad index: %d", idx) } }() From 1021a62ddc3c842ec43c5a12ec6b2047506591e4 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 25 May 2017 22:02:09 -0700 Subject: [PATCH 3/3] Pulls in Serf logger fix. --- vendor/github.com/hashicorp/serf/serf/ping_delegate.go | 9 ++++----- vendor/vendor.json | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go index f5d991b560..a4c44028ed 100644 --- a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go @@ -2,7 +2,6 @@ package serf import ( "bytes" - "log" "time" "github.com/armon/go-metrics" @@ -37,7 +36,7 @@ func (p *pingDelegate) AckPayload() []byte { // The rest of the message is the serialized coordinate. enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil { - log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err) + p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err) } return buf.Bytes() } @@ -52,7 +51,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat // Verify ping version in the header. version := payload[0] if version != PingVersion { - log.Printf("[ERR] serf: Unsupported ping version: %v", version) + p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version) return } @@ -61,7 +60,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) var coord coordinate.Coordinate if err := dec.Decode(&coord); err != nil { - log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) + p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) return } @@ -69,7 +68,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat before := p.serf.coordClient.GetCoordinate() after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) if err != nil { - log.Printf("[ERR] serf: Rejected coordinate from %s: %v\n", + p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n", other.Name, err) return } diff --git a/vendor/vendor.json b/vendor/vendor.json index a1d6b1e3f7..770afdf7a6 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -675,11 +675,11 @@ "revisionTime": "2017-05-25T23:15:04Z" }, { - "checksumSHA1": "cOk2eJmnkqSyA0utcLlzWMFDwXg=", + "checksumSHA1": "ZkJRgexeNzNZzpw6YnedwoJl7pE=", "comment": "v0.7.0-66-g6c4672d", "path": "github.com/hashicorp/serf/serf", - "revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a", - "revisionTime": "2017-05-25T23:15:04Z" + "revision": "dfab144618a063232d5753eaa4250a09865106c5", + "revisionTime": "2017-05-26T05:01:28Z" }, { "checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",