diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 73f057c985..c43e4cc599 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -717,12 +717,12 @@ func (r *Raft) RemovePeer(peer ServerAddress) Future { } // AddVoter will add the given server to the cluster as a staging server. If the -// server is already in the cluster as a voter, this does nothing. This must be -// run on the leader or it will fail. The leader will promote the staging server -// to a voter once that server is ready. If nonzero, prevIndex is the index of -// the only configuration upon which this change may be applied; if another -// configuration entry has been added in the meantime, this request will fail. -// If nonzero, timeout is how long this server should wait before the +// server is already in the cluster as a voter, this updates the server's address. +// This must be run on the leader or it will fail. The leader will promote the +// staging server to a voter once that server is ready. If nonzero, prevIndex is +// the index of the only configuration upon which this change may be applied; if +// another configuration entry has been added in the meantime, this request will +// fail. If nonzero, timeout is how long this server should wait before the // configuration change log entry is appended. func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { if r.protocolVersion < 2 { @@ -739,9 +739,9 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti // AddNonvoter will add the given server to the cluster but won't assign it a // vote. The server will receive log entries, but it won't participate in -// elections or log entry commitment. If the server is already in the cluster as -// a staging server or voter, this does nothing. This must be run on the leader -// or it will fail. For prevIndex and timeout, see AddVoter. +// elections or log entry commitment. If the server is already in the cluster, +// this updates the server's address. This must be run on the leader or it will +// fail. For prevIndex and timeout, see AddVoter. func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { if r.protocolVersion < 3 { return errorFuture{ErrUnsupportedProtocol} diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 9555a0eaeb..cf5f4c7d0e 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -2,6 +2,7 @@ package raft import ( "bufio" + "context" "errors" "fmt" "io" @@ -76,6 +77,11 @@ type NetworkTransport struct { stream StreamLayer + // streamCtx is used to cancel existing connection handlers. + streamCtx context.Context + streamCancel context.CancelFunc + streamCtxLock sync.RWMutex + timeout time.Duration TimeoutScale int } @@ -154,7 +160,11 @@ func NewNetworkTransportWithConfig( TimeoutScale: DefaultTimeoutScale, serverAddressProvider: config.ServerAddressProvider, } + + // Create the connection context and then start our listener. + trans.setupStreamContext() go trans.listen() + return trans } @@ -190,6 +200,21 @@ func NewNetworkTransportWithLogger( return NewNetworkTransportWithConfig(config) } +// setupStreamContext is used to create a new stream context. This should be +// called with the stream lock held. +func (n *NetworkTransport) setupStreamContext() { + ctx, cancel := context.WithCancel(context.Background()) + n.streamCtx = ctx + n.streamCancel = cancel +} + +// getStreamContext is used retrieve the current stream context. +func (n *NetworkTransport) getStreamContext() context.Context { + n.streamCtxLock.RLock() + defer n.streamCtxLock.RUnlock() + return n.streamCtx +} + // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from // disk IO. @@ -199,6 +224,31 @@ func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) { n.heartbeatFn = cb } +// CloseStreams closes the current streams. +func (n *NetworkTransport) CloseStreams() { + n.connPoolLock.Lock() + defer n.connPoolLock.Unlock() + + // Close all the connections in the connection pool and then remove their + // entry. + for k, e := range n.connPool { + for _, conn := range e { + conn.Release() + } + + delete(n.connPool, k) + } + + // Cancel the existing connections and create a new context. Both these + // operations must always be done with the lock held otherwise we can create + // connection handlers that are holding a context that will never be + // cancelable. + n.streamCtxLock.Lock() + n.streamCancel() + n.setupStreamContext() + n.streamCtxLock.Unlock() +} + // Close is used to stop the network transport. func (n *NetworkTransport) Close() error { n.shutdownLock.Lock() @@ -259,7 +309,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv if n.serverAddressProvider != nil { serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) if err != nil { - n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err) } else { return serverAddressOverride } @@ -424,12 +474,14 @@ func (n *NetworkTransport) listen() { n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) // Handle the connection in dedicated routine - go n.handleConn(conn) + go n.handleConn(n.getStreamContext(), conn) } } -// handleConn is used to handle an inbound connection for its lifespan. -func (n *NetworkTransport) handleConn(conn net.Conn) { +// handleConn is used to handle an inbound connection for its lifespan. The +// handler will exit when the passed context is cancelled or the connection is +// closed. +func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { defer conn.Close() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -437,6 +489,13 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) for { + select { + case <-connCtx.Done(): + n.logger.Println("[DEBUG] raft-net: stream layer is closed") + return + default: + } + if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) diff --git a/vendor/github.com/hashicorp/raft/observer.go b/vendor/github.com/hashicorp/raft/observer.go index 76c4d555df..bce17ef19a 100644 --- a/vendor/github.com/hashicorp/raft/observer.go +++ b/vendor/github.com/hashicorp/raft/observer.go @@ -13,6 +13,11 @@ type Observation struct { Data interface{} } +// LeaderObservation is used for the data when leadership changes. +type LeaderObservation struct { + leader ServerAddress +} + // nextObserverId is used to provide a unique ID for each observer to aid in // deregistration. var nextObserverID uint64 diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index 50ae6e916c..395ecf7454 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -88,8 +88,12 @@ type leaderState struct { // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader ServerAddress) { r.leaderLock.Lock() + oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() + if oldLeader != leader { + r.observe(LeaderObservation{leader: leader}) + } } // requestConfigChange is a helper for the above functions that make @@ -440,6 +444,7 @@ func (r *Raft) startStopReplication() { currentTerm: r.getCurrentTerm(), nextIndex: lastIdx + 1, lastContact: time.Now(), + notify: make(map[*verifyFuture]struct{}), notifyCh: make(chan struct{}, 1), stepDown: r.leaderState.stepDown, } @@ -551,11 +556,17 @@ func (r *Raft) leaderLoop() { r.logger.Printf("[WARN] raft: New leader elected, stepping down") r.setState(Follower) delete(r.leaderState.notify, v) + for _, repl := range r.leaderState.replState { + repl.cleanNotify(v) + } v.respond(ErrNotLeader) } else { // Quorum of members agree, we are still leader delete(r.leaderState.notify, v) + for _, repl := range r.leaderState.replState { + repl.cleanNotify(v) + } v.respond(nil) } @@ -635,7 +646,7 @@ func (r *Raft) verifyLeader(v *verifyFuture) { // Trigger immediate heartbeats for _, repl := range r.leaderState.replState { repl.notifyLock.Lock() - repl.notify = append(repl.notify, v) + repl.notify[v] = struct{}{} repl.notifyLock.Unlock() asyncNotifyCh(repl.notifyCh) } diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index e631b5a09b..f5903db114 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -64,9 +64,9 @@ type followerReplication struct { // notifyCh is notified to send out a heartbeat, which is used to check that // this server is still leader. notifyCh chan struct{} - // notify is a list of futures to be resolved upon receipt of an - // acknowledgement, then cleared from this list. - notify []*verifyFuture + // notify is a map of futures to be resolved upon receipt of an + // acknowledgement, then cleared from this map. + notify map[*verifyFuture]struct{} // notifyLock protects 'notify'. notifyLock sync.Mutex @@ -85,15 +85,22 @@ func (s *followerReplication) notifyAll(leader bool) { // Clear the waiting notifies minimizing lock time s.notifyLock.Lock() n := s.notify - s.notify = nil + s.notify = make(map[*verifyFuture]struct{}) s.notifyLock.Unlock() // Submit our votes - for _, v := range n { + for v, _ := range n { v.vote(leader) } } +// cleanNotify is used to delete notify, . +func (s *followerReplication) cleanNotify(v *verifyFuture) { + s.notifyLock.Lock() + delete(s.notify, v) + s.notifyLock.Unlock() +} + // LastContact returns the time of last contact. func (s *followerReplication) LastContact() time.Time { s.lastContactLock.RLock() diff --git a/vendor/github.com/hashicorp/raft/tag.sh b/vendor/github.com/hashicorp/raft/tag.sh new file mode 100755 index 0000000000..cd16623a70 --- /dev/null +++ b/vendor/github.com/hashicorp/raft/tag.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +# The version must be supplied from the environment. Do not include the +# leading "v". +if [ -z $VERSION ]; then + echo "Please specify a version." + exit 1 +fi + +# Generate the tag. +echo "==> Tagging version $VERSION..." +git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION" +git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master + +exit 0 diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 29b2740f62..69c928ed92 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -47,8 +47,8 @@ func NewTCPTransportWithLogger( }) } -// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of -// a TCP streaming transport layer, using a default logger and the address provider +// NewTCPTransportWithConfig returns a NetworkTransport that is built on top of +// a TCP streaming transport layer, using the given config struct. func NewTCPTransportWithConfig( bindAddr string, advertise net.Addr, diff --git a/vendor/vendor.json b/vendor/vendor.json index fc9cb7af6b..031342cfdc 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -122,7 +122,7 @@ {"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"}, {"path":"github.com/hashicorp/memberlist","checksumSHA1":"q6yTL5vSGnWxUtcocVU3YIG/HNc=","revision":"b195c8e4fcc6284fff1583fd6ab09e68ca207551","revisionTime":"2018-08-09T14:04:54Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"}, - {"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, + {"path":"github.com/hashicorp/raft","checksumSHA1":"3U9bQLEMikE47n4TZP6uOdgXIyQ=","revision":"da92cfe76e0c1c9b94bbc9d884ec4b2b3b90b699","revisionTime":"2018-08-17T18:12:11Z","version":"master","versionExact":"master"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"axdQxCEwvUr1AygfYIMMxPkS1pY=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"},