Update Raft Vendoring (#4539)

Pulls in a fix for a potential memory leak regarding consistent reads that invoke VerifyLeader.
This commit is contained in:
Matt Keeler 2018-09-06 15:07:42 -04:00 committed by GitHub
parent 9b96b4baea
commit d1e52e5292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 120 additions and 22 deletions

View File

@ -717,12 +717,12 @@ func (r *Raft) RemovePeer(peer ServerAddress) Future {
} }
// AddVoter will add the given server to the cluster as a staging server. If the // AddVoter will add the given server to the cluster as a staging server. If the
// server is already in the cluster as a voter, this does nothing. This must be // server is already in the cluster as a voter, this updates the server's address.
// run on the leader or it will fail. The leader will promote the staging server // This must be run on the leader or it will fail. The leader will promote the
// to a voter once that server is ready. If nonzero, prevIndex is the index of // staging server to a voter once that server is ready. If nonzero, prevIndex is
// the only configuration upon which this change may be applied; if another // the index of the only configuration upon which this change may be applied; if
// configuration entry has been added in the meantime, this request will fail. // another configuration entry has been added in the meantime, this request will
// If nonzero, timeout is how long this server should wait before the // fail. If nonzero, timeout is how long this server should wait before the
// configuration change log entry is appended. // configuration change log entry is appended.
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 { if r.protocolVersion < 2 {
@ -739,9 +739,9 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti
// AddNonvoter will add the given server to the cluster but won't assign it a // AddNonvoter will add the given server to the cluster but won't assign it a
// vote. The server will receive log entries, but it won't participate in // vote. The server will receive log entries, but it won't participate in
// elections or log entry commitment. If the server is already in the cluster as // elections or log entry commitment. If the server is already in the cluster,
// a staging server or voter, this does nothing. This must be run on the leader // this updates the server's address. This must be run on the leader or it will
// or it will fail. For prevIndex and timeout, see AddVoter. // fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture { func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 { if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol} return errorFuture{ErrUnsupportedProtocol}

View File

@ -2,6 +2,7 @@ package raft
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -76,6 +77,11 @@ type NetworkTransport struct {
stream StreamLayer stream StreamLayer
// streamCtx is used to cancel existing connection handlers.
streamCtx context.Context
streamCancel context.CancelFunc
streamCtxLock sync.RWMutex
timeout time.Duration timeout time.Duration
TimeoutScale int TimeoutScale int
} }
@ -154,7 +160,11 @@ func NewNetworkTransportWithConfig(
TimeoutScale: DefaultTimeoutScale, TimeoutScale: DefaultTimeoutScale,
serverAddressProvider: config.ServerAddressProvider, serverAddressProvider: config.ServerAddressProvider,
} }
// Create the connection context and then start our listener.
trans.setupStreamContext()
go trans.listen() go trans.listen()
return trans return trans
} }
@ -190,6 +200,21 @@ func NewNetworkTransportWithLogger(
return NewNetworkTransportWithConfig(config) return NewNetworkTransportWithConfig(config)
} }
// setupStreamContext is used to create a new stream context. This should be
// called with the stream lock held.
func (n *NetworkTransport) setupStreamContext() {
ctx, cancel := context.WithCancel(context.Background())
n.streamCtx = ctx
n.streamCancel = cancel
}
// getStreamContext is used retrieve the current stream context.
func (n *NetworkTransport) getStreamContext() context.Context {
n.streamCtxLock.RLock()
defer n.streamCtxLock.RUnlock()
return n.streamCtx
}
// SetHeartbeatHandler is used to setup a heartbeat handler // SetHeartbeatHandler is used to setup a heartbeat handler
// as a fast-pass. This is to avoid head-of-line blocking from // as a fast-pass. This is to avoid head-of-line blocking from
// disk IO. // disk IO.
@ -199,6 +224,31 @@ func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) {
n.heartbeatFn = cb n.heartbeatFn = cb
} }
// CloseStreams closes the current streams.
func (n *NetworkTransport) CloseStreams() {
n.connPoolLock.Lock()
defer n.connPoolLock.Unlock()
// Close all the connections in the connection pool and then remove their
// entry.
for k, e := range n.connPool {
for _, conn := range e {
conn.Release()
}
delete(n.connPool, k)
}
// Cancel the existing connections and create a new context. Both these
// operations must always be done with the lock held otherwise we can create
// connection handlers that are holding a context that will never be
// cancelable.
n.streamCtxLock.Lock()
n.streamCancel()
n.setupStreamContext()
n.streamCtxLock.Unlock()
}
// Close is used to stop the network transport. // Close is used to stop the network transport.
func (n *NetworkTransport) Close() error { func (n *NetworkTransport) Close() error {
n.shutdownLock.Lock() n.shutdownLock.Lock()
@ -259,7 +309,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil { if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil { if err != nil {
n.logger.Printf("[WARN] Unable to get address for server id %v, using fallback address %v: %v", id, target, err) n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err)
} else { } else {
return serverAddressOverride return serverAddressOverride
} }
@ -424,12 +474,14 @@ func (n *NetworkTransport) listen() {
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
// Handle the connection in dedicated routine // Handle the connection in dedicated routine
go n.handleConn(conn) go n.handleConn(n.getStreamContext(), conn)
} }
} }
// handleConn is used to handle an inbound connection for its lifespan. // handleConn is used to handle an inbound connection for its lifespan. The
func (n *NetworkTransport) handleConn(conn net.Conn) { // handler will exit when the passed context is cancelled or the connection is
// closed.
func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
defer conn.Close() defer conn.Close()
r := bufio.NewReader(conn) r := bufio.NewReader(conn)
w := bufio.NewWriter(conn) w := bufio.NewWriter(conn)
@ -437,6 +489,13 @@ func (n *NetworkTransport) handleConn(conn net.Conn) {
enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) enc := codec.NewEncoder(w, &codec.MsgpackHandle{})
for { for {
select {
case <-connCtx.Done():
n.logger.Println("[DEBUG] raft-net: stream layer is closed")
return
default:
}
if err := n.handleCommand(r, dec, enc); err != nil { if err := n.handleCommand(r, dec, enc); err != nil {
if err != io.EOF { if err != io.EOF {
n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err)

View File

@ -13,6 +13,11 @@ type Observation struct {
Data interface{} Data interface{}
} }
// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
leader ServerAddress
}
// nextObserverId is used to provide a unique ID for each observer to aid in // nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration. // deregistration.
var nextObserverID uint64 var nextObserverID uint64

View File

@ -88,8 +88,12 @@ type leaderState struct {
// setLeader is used to modify the current leader of the cluster // setLeader is used to modify the current leader of the cluster
func (r *Raft) setLeader(leader ServerAddress) { func (r *Raft) setLeader(leader ServerAddress) {
r.leaderLock.Lock() r.leaderLock.Lock()
oldLeader := r.leader
r.leader = leader r.leader = leader
r.leaderLock.Unlock() r.leaderLock.Unlock()
if oldLeader != leader {
r.observe(LeaderObservation{leader: leader})
}
} }
// requestConfigChange is a helper for the above functions that make // requestConfigChange is a helper for the above functions that make
@ -440,6 +444,7 @@ func (r *Raft) startStopReplication() {
currentTerm: r.getCurrentTerm(), currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1, nextIndex: lastIdx + 1,
lastContact: time.Now(), lastContact: time.Now(),
notify: make(map[*verifyFuture]struct{}),
notifyCh: make(chan struct{}, 1), notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown, stepDown: r.leaderState.stepDown,
} }
@ -551,11 +556,17 @@ func (r *Raft) leaderLoop() {
r.logger.Printf("[WARN] raft: New leader elected, stepping down") r.logger.Printf("[WARN] raft: New leader elected, stepping down")
r.setState(Follower) r.setState(Follower)
delete(r.leaderState.notify, v) delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader) v.respond(ErrNotLeader)
} else { } else {
// Quorum of members agree, we are still leader // Quorum of members agree, we are still leader
delete(r.leaderState.notify, v) delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil) v.respond(nil)
} }
@ -635,7 +646,7 @@ func (r *Raft) verifyLeader(v *verifyFuture) {
// Trigger immediate heartbeats // Trigger immediate heartbeats
for _, repl := range r.leaderState.replState { for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock() repl.notifyLock.Lock()
repl.notify = append(repl.notify, v) repl.notify[v] = struct{}{}
repl.notifyLock.Unlock() repl.notifyLock.Unlock()
asyncNotifyCh(repl.notifyCh) asyncNotifyCh(repl.notifyCh)
} }

View File

@ -64,9 +64,9 @@ type followerReplication struct {
// notifyCh is notified to send out a heartbeat, which is used to check that // notifyCh is notified to send out a heartbeat, which is used to check that
// this server is still leader. // this server is still leader.
notifyCh chan struct{} notifyCh chan struct{}
// notify is a list of futures to be resolved upon receipt of an // notify is a map of futures to be resolved upon receipt of an
// acknowledgement, then cleared from this list. // acknowledgement, then cleared from this map.
notify []*verifyFuture notify map[*verifyFuture]struct{}
// notifyLock protects 'notify'. // notifyLock protects 'notify'.
notifyLock sync.Mutex notifyLock sync.Mutex
@ -85,15 +85,22 @@ func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time // Clear the waiting notifies minimizing lock time
s.notifyLock.Lock() s.notifyLock.Lock()
n := s.notify n := s.notify
s.notify = nil s.notify = make(map[*verifyFuture]struct{})
s.notifyLock.Unlock() s.notifyLock.Unlock()
// Submit our votes // Submit our votes
for _, v := range n { for v, _ := range n {
v.vote(leader) v.vote(leader)
} }
} }
// cleanNotify is used to delete notify, .
func (s *followerReplication) cleanNotify(v *verifyFuture) {
s.notifyLock.Lock()
delete(s.notify, v)
s.notifyLock.Unlock()
}
// LastContact returns the time of last contact. // LastContact returns the time of last contact.
func (s *followerReplication) LastContact() time.Time { func (s *followerReplication) LastContact() time.Time {
s.lastContactLock.RLock() s.lastContactLock.RLock()

16
vendor/github.com/hashicorp/raft/tag.sh generated vendored Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -e
# The version must be supplied from the environment. Do not include the
# leading "v".
if [ -z $VERSION ]; then
echo "Please specify a version."
exit 1
fi
# Generate the tag.
echo "==> Tagging version $VERSION..."
git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION"
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master
exit 0

View File

@ -47,8 +47,8 @@ func NewTCPTransportWithLogger(
}) })
} }
// NewTCPTransportWithLogger returns a NetworkTransport that is built on top of // NewTCPTransportWithConfig returns a NetworkTransport that is built on top of
// a TCP streaming transport layer, using a default logger and the address provider // a TCP streaming transport layer, using the given config struct.
func NewTCPTransportWithConfig( func NewTCPTransportWithConfig(
bindAddr string, bindAddr string,
advertise net.Addr, advertise net.Addr,

2
vendor/vendor.json vendored
View File

@ -122,7 +122,7 @@
{"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"}, {"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"},
{"path":"github.com/hashicorp/memberlist","checksumSHA1":"q6yTL5vSGnWxUtcocVU3YIG/HNc=","revision":"b195c8e4fcc6284fff1583fd6ab09e68ca207551","revisionTime":"2018-08-09T14:04:54Z"}, {"path":"github.com/hashicorp/memberlist","checksumSHA1":"q6yTL5vSGnWxUtcocVU3YIG/HNc=","revision":"b195c8e4fcc6284fff1583fd6ab09e68ca207551","revisionTime":"2018-08-09T14:04:54Z"},
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"}, {"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"3U9bQLEMikE47n4TZP6uOdgXIyQ=","revision":"da92cfe76e0c1c9b94bbc9d884ec4b2b3b90b699","revisionTime":"2018-08-17T18:12:11Z","version":"master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"axdQxCEwvUr1AygfYIMMxPkS1pY=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"axdQxCEwvUr1AygfYIMMxPkS1pY=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"},