From 708957a982429df1727f27a7f9dc307ac1f3d3b2 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Thu, 8 Oct 2020 15:07:10 -0400 Subject: [PATCH] chore: update raft to v1.2.0 (#8822) --- go.mod | 2 +- go.sum | 4 +-- vendor/github.com/hashicorp/raft/CHANGELOG.md | 19 +++++++++++ vendor/github.com/hashicorp/raft/Makefile | 4 +-- vendor/github.com/hashicorp/raft/README.md | 2 +- vendor/github.com/hashicorp/raft/api.go | 24 +++++++------- vendor/github.com/hashicorp/raft/config.go | 4 --- .../hashicorp/raft/file_snapshot.go | 24 ++++++++++---- vendor/github.com/hashicorp/raft/future.go | 13 +++++--- .../hashicorp/raft/inmem_snapshot.go | 6 ++-- .../hashicorp/raft/inmem_transport.go | 4 +-- .../hashicorp/raft/net_transport.go | 2 +- vendor/github.com/hashicorp/raft/raft.go | 32 ++++++++++++++----- .../github.com/hashicorp/raft/replication.go | 10 ++++++ vendor/github.com/hashicorp/raft/snapshot.go | 1 + .../hashicorp/raft/tcp_transport.go | 2 +- vendor/github.com/hashicorp/raft/testing.go | 26 ++++++++------- vendor/github.com/hashicorp/raft/util.go | 19 +++++++++++ vendor/modules.txt | 2 +- 19 files changed, 141 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index 4308e153b8..3565e45608 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/hashicorp/hil v0.0.0-20160711231837-1e86c6b523c5 github.com/hashicorp/memberlist v0.2.2 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 - github.com/hashicorp/raft v1.1.2 + github.com/hashicorp/raft v1.2.0 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/serf v0.9.5 github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 diff --git a/go.sum b/go.sum index e585b74707..0c7a86f9db 100644 --- a/go.sum +++ b/go.sum @@ -284,8 +284,8 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= -github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= -github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0= +github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM= diff --git a/vendor/github.com/hashicorp/raft/CHANGELOG.md b/vendor/github.com/hashicorp/raft/CHANGELOG.md index a2c3a0ac82..b002e48046 100644 --- a/vendor/github.com/hashicorp/raft/CHANGELOG.md +++ b/vendor/github.com/hashicorp/raft/CHANGELOG.md @@ -1,3 +1,22 @@ +# UNRELEASED + +IMPROVEMENTS + +* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)] +* Allow futures to react to `Shutdown()` to prevent a deadlock with `takeSnapshot()` [[GH-390](https://github.com/hashicorp/raft/pull/390)] +* Prevent non-voters from becoming eligible for leadership elections [[GH-398](https://github.com/hashicorp/raft/pull/398)] +* Remove an unneeded `io.Copy` from snapshot writes [[GH-399](https://github.com/hashicorp/raft/pull/399)] +* Log decoded candidate address in `duplicate requestVote` warning [[GH-400](https://github.com/hashicorp/raft/pull/400)] +* Prevent starting a TCP transport when IP address is `nil` [[GH-403](https://github.com/hashicorp/raft/pull/403)] +* Reject leadership transfer requests when in candidate state to prevent indefinite blocking while unable to elect a leader [[GH-413](https://github.com/hashicorp/raft/pull/413)] +* Add labels for metric metadata to reduce cardinality of metric names [[GH-409](https://github.com/hashicorp/raft/pull/409)] +* Add peers metric [[GH-413](https://github.com/hashicorp/raft/pull/431)] + +BUG FIXES + +* Make `LeaderCh` always deliver the latest leadership transition [[GH-384](https://github.com/hashicorp/raft/pull/384)] +* Handle updating an existing peer in `startStopReplication` [[GH-419](https://github.com/hashicorp/raft/pull/419)] + # 1.1.2 (January 17th, 2020) FEATURES diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index 8ce7c06ab1..60e6a557e5 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -16,8 +16,8 @@ endif TEST_RESULTS_DIR?=/tmp/test-results test: - go test $(TESTARGS) -timeout=60s -race . - go test $(TESTARGS) -timeout=60s -tags batchtest -race . + GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race . + GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race . integ: test INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ . diff --git a/vendor/github.com/hashicorp/raft/README.md b/vendor/github.com/hashicorp/raft/README.md index dc8bb6448c..7f6ff83074 100644 --- a/vendor/github.com/hashicorp/raft/README.md +++ b/vendor/github.com/hashicorp/raft/README.md @@ -1,4 +1,4 @@ -raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis-ci.org/hashicorp/raft) [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft) +raft [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft) ==== raft is a [Go](http://www.golang.org) library that manages a replicated diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 03f43601d4..1348453f55 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -503,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna fsm: fsm, fsmMutateCh: make(chan interface{}, 128), fsmSnapshotCh: make(chan *reqSnapshotFuture), - leaderCh: make(chan bool), + leaderCh: make(chan bool, 1), localID: localID, localAddr: localAddr, logger: logger, @@ -527,13 +527,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // Initialize as a follower. r.setState(Follower) - // Start as leader if specified. This should only be used - // for testing purposes. - if conf.StartAsLeader { - r.setState(Leader) - r.setLeader(r.localAddr) - } - // Restore the current term and the last log. r.setCurrentTerm(currentTerm) r.setLastLog(lastLog.Index, lastLog.Term) @@ -959,10 +952,17 @@ func (r *Raft) State() RaftState { return r.getState() } -// LeaderCh is used to get a channel which delivers signals on -// acquiring or losing leadership. It sends true if we become -// the leader, and false if we lose it. The channel is not buffered, -// and does not block on writes. +// LeaderCh is used to get a channel which delivers signals on acquiring or +// losing leadership. It sends true if we become the leader, and false if we +// lose it. +// +// Receivers can expect to receive a notification only if leadership +// transition has occured. +// +// If receivers aren't ready for the signal, signals may drop and only the +// latest leadership transition. For example, if a receiver receives subsequent +// `true` values, they may deduce that leadership was lost and regained while +// the the receiver was processing first leadership transition. func (r *Raft) LeaderCh() <-chan bool { return r.leaderCh } diff --git a/vendor/github.com/hashicorp/raft/config.go b/vendor/github.com/hashicorp/raft/config.go index 3ec564df44..af71f691a9 100644 --- a/vendor/github.com/hashicorp/raft/config.go +++ b/vendor/github.com/hashicorp/raft/config.go @@ -178,10 +178,6 @@ type Config struct { // step down as leader. LeaderLeaseTimeout time.Duration - // StartAsLeader forces Raft to start in the leader state. This should - // never be used except for testing purposes, as it can cause a split-brain. - StartAsLeader bool - // The unique ID for this server across all time. When running with // ProtocolVersion < 3, you must set this to be the same as the network // address of your transport. diff --git a/vendor/github.com/hashicorp/raft/file_snapshot.go b/vendor/github.com/hashicorp/raft/file_snapshot.go index d1604493e0..e4d1ea4f9e 100644 --- a/vendor/github.com/hashicorp/raft/file_snapshot.go +++ b/vendor/github.com/hashicorp/raft/file_snapshot.go @@ -5,7 +5,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/hashicorp/go-hclog" "hash" "hash/crc64" "io" @@ -16,6 +15,8 @@ import ( "sort" "strings" "time" + + hclog "github.com/hashicorp/go-hclog" ) const ( @@ -32,6 +33,10 @@ type FileSnapshotStore struct { path string retain int logger hclog.Logger + + // noSync, if true, skips crash-safe file fsync api calls. + // It's a private field, only used in testing + noSync bool } type snapMetaSlice []*fileSnapshotMeta @@ -44,6 +49,8 @@ type FileSnapshotSink struct { parentDir string meta fileSnapshotMeta + noSync bool + stateFile *os.File stateHash hash.Hash64 buffered *bufio.Writer @@ -172,6 +179,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, logger: f.logger, dir: path, parentDir: f.path, + noSync: f.noSync, meta: fileSnapshotMeta{ SnapshotMeta: SnapshotMeta{ Version: version, @@ -414,7 +422,7 @@ func (s *FileSnapshotSink) Close() error { return err } - if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems + if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems parentFH, err := os.Open(s.parentDir) defer parentFH.Close() if err != nil { @@ -462,8 +470,10 @@ func (s *FileSnapshotSink) finalize() error { } // Sync to force fsync to disk - if err := s.stateFile.Sync(); err != nil { - return err + if !s.noSync { + if err := s.stateFile.Sync(); err != nil { + return err + } } // Get the file size @@ -510,8 +520,10 @@ func (s *FileSnapshotSink) writeMeta() error { return err } - if err = fh.Sync(); err != nil { - return err + if !s.noSync { + if err = fh.Sync(); err != nil { + return err + } } return nil diff --git a/vendor/github.com/hashicorp/raft/future.go b/vendor/github.com/hashicorp/raft/future.go index 6346b453b1..6de2daea4f 100644 --- a/vendor/github.com/hashicorp/raft/future.go +++ b/vendor/github.com/hashicorp/raft/future.go @@ -84,9 +84,10 @@ func (e errorFuture) Index() uint64 { // deferError can be embedded to allow a future // to provide an error in the future. type deferError struct { - err error - errCh chan error - responded bool + err error + errCh chan error + responded bool + ShutdownCh chan struct{} } func (d *deferError) init() { @@ -103,7 +104,11 @@ func (d *deferError) Error() error { if d.errCh == nil { panic("waiting for response on nil channel") } - d.err = <-d.errCh + select { + case d.err = <-d.errCh: + case <-d.ShutdownCh: + d.err = ErrRaftShutdown + } return d.err } diff --git a/vendor/github.com/hashicorp/raft/inmem_snapshot.go b/vendor/github.com/hashicorp/raft/inmem_snapshot.go index 641d9d8172..5e0c202fa0 100644 --- a/vendor/github.com/hashicorp/raft/inmem_snapshot.go +++ b/vendor/github.com/hashicorp/raft/inmem_snapshot.go @@ -90,9 +90,9 @@ func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, erro // Write appends the given bytes to the snapshot contents func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) { - written, err := io.Copy(s.contents, bytes.NewReader(p)) - s.meta.Size += written - return int(written), err + written, err := s.contents.Write(p) + s.meta.Size += int64(written) + return written, err } // Close updates the Size and is otherwise a no-op diff --git a/vendor/github.com/hashicorp/raft/inmem_transport.go b/vendor/github.com/hashicorp/raft/inmem_transport.go index 6196dfa404..b5bdecc73c 100644 --- a/vendor/github.com/hashicorp/raft/inmem_transport.go +++ b/vendor/github.com/hashicorp/raft/inmem_transport.go @@ -63,7 +63,7 @@ func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (Se // NewInmemTransport is used to initialize a new transport // and generates a random local address if none is specified func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) { - return NewInmemTransportWithTimeout(addr, 50*time.Millisecond) + return NewInmemTransportWithTimeout(addr, 500*time.Millisecond) } // SetHeartbeatHandler is used to set optional fast-path for @@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re } // Send the RPC over - respCh := make(chan RPCResponse) + respCh := make(chan RPCResponse, 1) req := RPC{ Command: args, Reader: r, diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 6092cafbcf..a530ffd251 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -319,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv if n.serverAddressProvider != nil { serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) if err != nil { - n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err) + n.logger.Warn("unable to get address for server, using fallback address", "id", id, "fallback", target, "error", err) } else { return serverAddressOverride } diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index fb8f4b308e..320ecb1ed9 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -311,6 +311,10 @@ func (r *Raft) runCandidate() { // Reject any restores since we are not the leader r.respond(ErrNotLeader) + case r := <-r.leadershipTransferCh: + // Reject any operations since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -364,7 +368,7 @@ func (r *Raft) runLeader() { metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) // Notify that we are the leader - asyncNotifyBool(r.leaderCh, true) + overrideNotifyBool(r.leaderCh, true) // Push to the notify channel if given if notify := r.conf.NotifyCh; notify != nil { @@ -420,7 +424,7 @@ func (r *Raft) runLeader() { r.leaderLock.Unlock() // Notify that we are not the leader - asyncNotifyBool(r.leaderCh, false) + overrideNotifyBool(r.leaderCh, false) // Push to the notify channel if given if notify := r.conf.NotifyCh; notify != nil { @@ -469,10 +473,13 @@ func (r *Raft) startStopReplication() { if server.ID == r.localID { continue } + inConfig[server.ID] = true - if _, ok := r.leaderState.replState[server.ID]; !ok { + + s, ok := r.leaderState.replState[server.ID] + if !ok { r.logger.Info("added peer, starting replication", "peer", server.ID) - s := &followerReplication{ + s = &followerReplication{ peer: server, commitment: r.leaderState.commitment, stopCh: make(chan uint64, 1), @@ -485,10 +492,14 @@ func (r *Raft) startStopReplication() { notifyCh: make(chan struct{}, 1), stepDown: r.leaderState.stepDown, } + r.leaderState.replState[server.ID] = s r.goFunc(func() { r.replicate(s) }) asyncNotifyCh(s.triggerCh) r.observe(PeerObservation{Peer: server, Removed: false}) + } else if ok && s.peer.Address != server.Address { + r.logger.Info("updating peer", "peer", server.ID) + s.peer = server } } @@ -504,6 +515,9 @@ func (r *Raft) startStopReplication() { delete(r.leaderState.replState, serverID) r.observe(PeerObservation{Peer: repl.peer, Removed: true}) } + + // Update peers metric + metrics.SetGauge([]string{"raft", "peers"}, float32(len(r.configurations.latest.Servers))) } // configurationChangeChIfStable returns r.configurationChangeCh if it's safe @@ -982,6 +996,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { // Restore the snapshot into the FSM. If this fails we are in a // bad state so we panic to take ourselves out. fsm := &restoreFuture{ID: sink.ID()} + fsm.ShutdownCh = r.shutdownCh fsm.init() select { case r.fsmMutateCh <- fsm: @@ -1451,7 +1466,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { if lastVoteTerm == req.Term && lastVoteCandBytes != nil { r.logger.Info("duplicate requestVote for same term", "term", req.Term) if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { - r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate) + r.logger.Warn("duplicate requestVote from", "candidate", candidate) resp.Granted = true } return @@ -1576,6 +1591,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Restore snapshot future := &restoreFuture{ID: sink.ID()} + future.ShutdownCh = r.shutdownCh future.init() select { case r.fsmMutateCh <- future: @@ -1732,13 +1748,13 @@ func (r *Raft) lookupServer(id ServerID) *Server { return nil } -// pickServer returns the follower that is most up to date. Because it accesses -// leaderstate, it should only be called from the leaderloop. +// pickServer returns the follower that is most up to date and participating in quorum. +// Because it accesses leaderstate, it should only be called from the leaderloop. func (r *Raft) pickServer() *Server { var pick *Server var current uint64 for _, server := range r.configurations.latest.Servers { - if server.ID == r.localID { + if server.ID == r.localID || server.Suffrage != Voter { continue } state, ok := r.leaderState.replState[server.ID] diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 1e2f2db701..30258a007b 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -326,6 +326,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { s.failures++ return false, err } + labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}} + metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels) + // Duplicated information. Kept for backward compatibility. metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start) // Check for a newer term, stop running @@ -386,6 +389,9 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { } else { s.setLastContact() failures = 0 + labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}} + metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels) + // Duplicated information. Kept for backward compatibility. metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start) s.notifyAll(resp.Success) } @@ -572,6 +578,10 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64 // appendStats is used to emit stats about an AppendEntries invocation. func appendStats(peer string, start time.Time, logs float32) { + labels := []metrics.Label{{Name: "peer_id", Value: peer}} + metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels) + metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels) + // Duplicated information. Kept for backward compatibility. metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start) metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs) } diff --git a/vendor/github.com/hashicorp/raft/snapshot.go b/vendor/github.com/hashicorp/raft/snapshot.go index f4c3945145..805a09d707 100644 --- a/vendor/github.com/hashicorp/raft/snapshot.go +++ b/vendor/github.com/hashicorp/raft/snapshot.go @@ -146,6 +146,7 @@ func (r *Raft) takeSnapshot() (string, error) { // We have to use the future here to safely get this information since // it is owned by the main thread. configReq := &configurationsFuture{} + configReq.ShutdownCh = r.shutdownCh configReq.init() select { case r.configurationsCh <- configReq: diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index ff40a57bcd..3bd4219587 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -81,7 +81,7 @@ func newTCPTransport(bindAddr string, list.Close() return nil, errNotTCP } - if addr.IP.IsUnspecified() { + if addr.IP == nil || addr.IP.IsUnspecified() { list.Close() return nil, errNotAdvertisable } diff --git a/vendor/github.com/hashicorp/raft/testing.go b/vendor/github.com/hashicorp/raft/testing.go index 70fd7b7955..a782dd3148 100644 --- a/vendor/github.com/hashicorp/raft/testing.go +++ b/vendor/github.com/hashicorp/raft/testing.go @@ -2,6 +2,7 @@ package raft import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -276,19 +277,14 @@ func (c *cluster) Close() { // or a timeout occurs. It is possible to set a filter to look for specific // observations. Setting timeout to 0 means that it will wait forever until a // non-filtered observation is made. -func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan struct{} { +func (c *cluster) WaitEventChan(ctx context.Context, filter FilterFn) <-chan struct{} { ch := make(chan struct{}) go func() { defer close(ch) - var timeoutCh <-chan time.Time - if timeout > 0 { - timeoutCh = time.After(timeout) - } for { select { - case <-timeoutCh: + case <-ctx.Done(): return - case o, ok := <-c.observationCh: if !ok || filter == nil || filter(&o) { return @@ -304,11 +300,13 @@ func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan s // observations. Setting timeout to 0 means that it will wait forever until a // non-filtered observation is made or a test failure is signaled. func (c *cluster) WaitEvent(filter FilterFn, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + eventCh := c.WaitEventChan(ctx, filter) select { case <-c.failedCh: c.t.FailNow() - - case <-c.WaitEventChan(filter, timeout): + case <-eventCh: } } @@ -319,7 +317,9 @@ func (c *cluster) WaitForReplication(fsmLength int) { CHECK: for { - ch := c.WaitEventChan(nil, c.conf.CommitTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.conf.CommitTimeout) + defer cancel() + ch := c.WaitEventChan(ctx, nil) select { case <-c.failedCh: c.t.FailNow() @@ -415,6 +415,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft { } } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + eventCh := c.WaitEventChan(ctx, filter) select { case <-c.failedCh: c.t.FailNow() @@ -422,7 +425,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { case <-limitCh: c.FailNowf("timeout waiting for stable %s state", s) - case <-c.WaitEventChan(filter, 0): + case <-eventCh: c.logger.Debug("resetting stability timeout") case t, ok := <-timer.C: @@ -805,5 +808,6 @@ func FileSnapTest(t *testing.T) (string, *FileSnapshotStore) { if err != nil { t.Fatalf("err: %v", err) } + snap.noSync = true return dir, snap } diff --git a/vendor/github.com/hashicorp/raft/util.go b/vendor/github.com/hashicorp/raft/util.go index 6fec84a3ba..59a3f71d3f 100644 --- a/vendor/github.com/hashicorp/raft/util.go +++ b/vendor/github.com/hashicorp/raft/util.go @@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) { } } +// overrideNotifyBool is used to notify on a bool channel +// but override existing value if value is present. +// ch must be 1-item buffered channel. +// +// This method does not support multiple concurrent calls. +func overrideNotifyBool(ch chan bool, v bool) { + select { + case ch <- v: + // value sent, all done + case <-ch: + // channel had an old value + select { + case ch <- v: + default: + panic("race: channel was sent concurrently") + } + } +} + // Decode reverses the encode operation on a byte slice input. func decodeMsgPack(buf []byte, out interface{}) error { r := bytes.NewBuffer(buf) diff --git a/vendor/modules.txt b/vendor/modules.txt index 04b5d1c517..4087b87809 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -276,7 +276,7 @@ github.com/hashicorp/mdns github.com/hashicorp/memberlist # github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/net-rpc-msgpackrpc -# github.com/hashicorp/raft v1.1.2 +# github.com/hashicorp/raft v1.2.0 github.com/hashicorp/raft # github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/raft-boltdb