2020-10-08 15:07:10 -04:00

1832 lines
54 KiB
Go

package raft
import (
"bytes"
"container/list"
"fmt"
"io"
"io/ioutil"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics"
)
const (
minCheckInterval = 10 * time.Millisecond
)
var (
keyCurrentTerm = []byte("CurrentTerm")
keyLastVoteTerm = []byte("LastVoteTerm")
keyLastVoteCand = []byte("LastVoteCand")
)
// getRPCHeader returns an initialized RPCHeader struct for the given
// Raft instance. This structure is sent along with RPC requests and
// responses.
func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion,
}
}
// checkRPCHeader houses logic about whether this instance of Raft can process
// the given RPC message.
func (r *Raft) checkRPCHeader(rpc RPC) error {
// Get the header off the RPC message.
wh, ok := rpc.Command.(WithRPCHeader)
if !ok {
return fmt.Errorf("RPC does not have a header")
}
header := wh.GetRPCHeader()
// First check is to just make sure the code can understand the
// protocol at all.
if header.ProtocolVersion < ProtocolVersionMin ||
header.ProtocolVersion > ProtocolVersionMax {
return ErrUnsupportedProtocol
}
// Second check is whether we should support this message, given the
// current protocol we are configured to run. This will drop support
// for protocol version 0 starting at protocol version 2, which is
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
if header.ProtocolVersion < r.conf.ProtocolVersion-1 {
return ErrUnsupportedProtocol
}
return nil
}
// getSnapshotVersion returns the snapshot version that should be used when
// creating snapshots, given the protocol version in use.
func getSnapshotVersion(protocolVersion ProtocolVersion) SnapshotVersion {
// Right now we only have two versions and they are backwards compatible
// so we don't need to look at the protocol version.
return 1
}
// commitTuple is used to send an index that was committed,
// with an optional associated future that should be invoked.
type commitTuple struct {
log *Log
future *logFuture
}
// leaderState is state that is used while we are a leader.
type leaderState struct {
leadershipTransferInProgress int32 // indicates that a leadership transfer is in progress.
commitCh chan struct{}
commitment *commitment
inflight *list.List // list of logFuture in log index order
replState map[ServerID]*followerReplication
notify map[*verifyFuture]struct{}
stepDown chan struct{}
}
// setLeader is used to modify the current leader of the cluster
func (r *Raft) setLeader(leader ServerAddress) {
r.leaderLock.Lock()
oldLeader := r.leader
r.leader = leader
r.leaderLock.Unlock()
if oldLeader != leader {
r.observe(LeaderObservation{Leader: leader})
}
}
// requestConfigChange is a helper for the above functions that make
// configuration change requests. 'req' describes the change. For timeout,
// see AddVoter.
func (r *Raft) requestConfigChange(req configurationChangeRequest, timeout time.Duration) IndexFuture {
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
future := &configurationChangeFuture{
req: req,
}
future.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case r.configurationChangeCh <- future:
return future
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
}
}
// run is a long running goroutine that runs the Raft FSM.
func (r *Raft) run() {
for {
// Check if we are doing a shutdown
select {
case <-r.shutdownCh:
// Clear the leader to prevent forwarding
r.setLeader("")
return
default:
}
// Enter into a sub-FSM
switch r.getState() {
case Follower:
r.runFollower()
case Candidate:
r.runCandidate()
case Leader:
r.runLeader()
}
}
}
// runFollower runs the FSM for a follower.
func (r *Raft) runFollower() {
didWarn := false
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
for r.getState() == Follower {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
case c := <-r.configurationChangeCh:
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)
case a := <-r.applyCh:
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)
case v := <-r.verifyCh:
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)
case r := <-r.userRestoreCh:
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
case r := <-r.leadershipTransferCh:
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.configuration))
case <-heartbeatTimer:
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
// Check if we have had a successful contact
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
continue
}
// Heartbeat failed! Transition to the candidate state
lastLeader := r.Leader()
r.setLeader("")
if r.configurations.latestIndex == 0 {
if !didWarn {
r.logger.Warn("no known peers, aborting election")
didWarn = true
}
} else if r.configurations.latestIndex == r.configurations.committedIndex &&
!hasVote(r.configurations.latest, r.localID) {
if !didWarn {
r.logger.Warn("not part of stable configuration, aborting election")
didWarn = true
}
} else {
r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader)
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
r.setState(Candidate)
return
}
case <-r.shutdownCh:
return
}
}
}
// liveBootstrap attempts to seed an initial configuration for the cluster. See
// the Raft object's member BootstrapCluster for more details. This must only be
// called on the main thread, and only makes sense in the follower state.
func (r *Raft) liveBootstrap(configuration Configuration) error {
// Use the pre-init API to make the static updates.
err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots,
r.trans, configuration)
if err != nil {
return err
}
// Make the configuration live.
var entry Log
if err := r.logs.GetLog(1, &entry); err != nil {
panic(err)
}
r.setCurrentTerm(1)
r.setLastLog(entry.Index, entry.Term)
r.processConfigurationLogEntry(&entry)
return nil
}
// runCandidate runs the FSM for a candidate.
func (r *Raft) runCandidate() {
r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1)
metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1)
// Start vote for us, and set a timeout
voteCh := r.electSelf()
// Make sure the leadership transfer flag is reset after each run. Having this
// flag will set the field LeadershipTransfer in a RequestVoteRequst to true,
// which will make other servers vote even though they have a leader already.
// It is important to reset that flag, because this priviledge could be abused
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()
electionTimer := randomTimeout(r.conf.ElectionTimeout)
// Tally the votes, need a simple majority
grantedVotes := 0
votesNeeded := r.quorumSize()
r.logger.Debug("votes", "needed", votesNeeded)
for r.getState() == Candidate {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
case vote := <-voteCh:
// Check if the term is greater than ours, bail
if vote.Term > r.getCurrentTerm() {
r.logger.Debug("newer term discovered, fallback to follower")
r.setState(Follower)
r.setCurrentTerm(vote.Term)
return
}
// Check if the vote is granted
if vote.Granted {
grantedVotes++
r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
}
// Check if we've become the leader
if grantedVotes >= votesNeeded {
r.logger.Info("election won", "tally", grantedVotes)
r.setState(Leader)
r.setLeader(r.localAddr)
return
}
case c := <-r.configurationChangeCh:
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)
case a := <-r.applyCh:
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)
case v := <-r.verifyCh:
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)
case r := <-r.userRestoreCh:
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
case r := <-r.leadershipTransferCh:
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)
case <-electionTimer:
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
r.logger.Warn("Election timeout reached, restarting election")
return
case <-r.shutdownCh:
return
}
}
}
func (r *Raft) setLeadershipTransferInProgress(v bool) {
if v {
atomic.StoreInt32(&r.leaderState.leadershipTransferInProgress, 1)
} else {
atomic.StoreInt32(&r.leaderState.leadershipTransferInProgress, 0)
}
}
func (r *Raft) getLeadershipTransferInProgress() bool {
v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress)
if v == 1 {
return true
}
return false
}
func (r *Raft) setupLeaderState() {
r.leaderState.commitCh = make(chan struct{}, 1)
r.leaderState.commitment = newCommitment(r.leaderState.commitCh,
r.configurations.latest,
r.getLastIndex()+1 /* first index that may be committed in this term */)
r.leaderState.inflight = list.New()
r.leaderState.replState = make(map[ServerID]*followerReplication)
r.leaderState.notify = make(map[*verifyFuture]struct{})
r.leaderState.stepDown = make(chan struct{}, 1)
}
// runLeader runs the FSM for a leader. Do the setup here and drop into
// the leaderLoop for the hot loop.
func (r *Raft) runLeader() {
r.logger.Info("entering leader state", "leader", r)
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
// Notify that we are the leader
overrideNotifyBool(r.leaderCh, true)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
select {
case notify <- true:
case <-r.shutdownCh:
}
}
// setup leader state. This is only supposed to be accessed within the
// leaderloop.
r.setupLeaderState()
// Cleanup state on step down
defer func() {
// Since we were the leader previously, we update our
// last contact time when we step down, so that we are not
// reporting a last contact time from before we were the
// leader. Otherwise, to a client it would seem our data
// is extremely stale.
r.setLastContact()
// Stop replication
for _, p := range r.leaderState.replState {
close(p.stopCh)
}
// Respond to all inflight operations
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
e.Value.(*logFuture).respond(ErrLeadershipLost)
}
// Respond to any pending verify requests
for future := range r.leaderState.notify {
future.respond(ErrLeadershipLost)
}
// Clear all the state
r.leaderState.commitCh = nil
r.leaderState.commitment = nil
r.leaderState.inflight = nil
r.leaderState.replState = nil
r.leaderState.notify = nil
r.leaderState.stepDown = nil
// If we are stepping down for some reason, no known leader.
// We may have stepped down due to an RPC call, which would
// provide the leader, so we cannot always blank this out.
r.leaderLock.Lock()
if r.leader == r.localAddr {
r.leader = ""
}
r.leaderLock.Unlock()
// Notify that we are not the leader
overrideNotifyBool(r.leaderCh, false)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
select {
case notify <- false:
case <-r.shutdownCh:
// On shutdown, make a best effort but do not block
select {
case notify <- false:
default:
}
}
}
}()
// Start a replication routine for each peer
r.startStopReplication()
// Dispatch a no-op log entry first. This gets this leader up to the latest
// possible commit index, even in the absence of client commands. This used
// to append a configuration entry instead of a noop. However, that permits
// an unbounded number of uncommitted configurations in the log. We now
// maintain that there exists at most one uncommitted configuration entry in
// any log, so we have to do proper no-ops here.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
r.dispatchLogs([]*logFuture{noop})
// Sit in the leader loop until we step down
r.leaderLoop()
}
// startStopReplication will set up state and start asynchronous replication to
// new peers, and stop replication to removed peers. Before removing a peer,
// it'll instruct the replication routines to try to replicate to the current
// index. This must only be called from the main thread.
func (r *Raft) startStopReplication() {
inConfig := make(map[ServerID]bool, len(r.configurations.latest.Servers))
lastIdx := r.getLastIndex()
// Start replication goroutines that need starting
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
continue
}
inConfig[server.ID] = true
s, ok := r.leaderState.replState[server.ID]
if !ok {
r.logger.Info("added peer, starting replication", "peer", server.ID)
s = &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
triggerCh: make(chan struct{}, 1),
triggerDeferErrorCh: make(chan *deferError, 1),
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notify: make(map[*verifyFuture]struct{}),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
r.observe(PeerObservation{Peer: server, Removed: false})
} else if ok && s.peer.Address != server.Address {
r.logger.Info("updating peer", "peer", server.ID)
s.peer = server
}
}
// Stop replication goroutines that need stopping
for serverID, repl := range r.leaderState.replState {
if inConfig[serverID] {
continue
}
// Replicate up to lastIdx and stop
r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx)
repl.stopCh <- lastIdx
close(repl.stopCh)
delete(r.leaderState.replState, serverID)
r.observe(PeerObservation{Peer: repl.peer, Removed: true})
}
// Update peers metric
metrics.SetGauge([]string{"raft", "peers"}, float32(len(r.configurations.latest.Servers)))
}
// configurationChangeChIfStable returns r.configurationChangeCh if it's safe
// to process requests from it, or nil otherwise. This must only be called
// from the main thread.
//
// Note that if the conditions here were to change outside of leaderLoop to take
// this from nil to non-nil, we would need leaderLoop to be kicked.
func (r *Raft) configurationChangeChIfStable() chan *configurationChangeFuture {
// Have to wait until:
// 1. The latest configuration is committed, and
// 2. This leader has committed some entry (the noop) in this term
// https://groups.google.com/forum/#!msg/raft-dev/t4xj6dJTP6E/d2D9LrWRza8J
if r.configurations.latestIndex == r.configurations.committedIndex &&
r.getCommitIndex() >= r.leaderState.commitment.startIndex {
return r.configurationChangeCh
}
return nil
}
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
case <-r.leaderState.stepDown:
r.setState(Follower)
case future := <-r.leadershipTransferCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
r.logger.Debug("starting leadership transfer", "id", future.ID, "address", future.Address)
// When we are leaving leaderLoop, we are no longer
// leader, so we should stop transferring.
leftLeaderLoop := make(chan struct{})
defer func() { close(leftLeaderLoop) }()
stopCh := make(chan struct{})
doneCh := make(chan error, 1)
// This is intentionally being setup outside of the
// leadershipTransfer function. Because the TimeoutNow
// call is blocking and there is no way to abort that
// in case eg the timer expires.
// The leadershipTransfer function is controlled with
// the stopCh and doneCh.
go func() {
select {
case <-time.After(r.conf.ElectionTimeout):
close(stopCh)
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
future.respond(err)
<-doneCh
case <-leftLeaderLoop:
close(stopCh)
err := fmt.Errorf("lost leadership during transfer (expected)")
r.logger.Debug(err.Error())
future.respond(nil)
<-doneCh
case err := <-doneCh:
if err != nil {
r.logger.Debug(err.Error())
}
future.respond(err)
}
}()
// leaderState.replState is accessed here before
// starting leadership transfer asynchronously because
// leaderState is only supposed to be accessed in the
// leaderloop.
id := future.ID
address := future.Address
if id == nil {
s := r.pickServer()
if s != nil {
id = &s.ID
address = &s.Address
} else {
doneCh <- fmt.Errorf("cannot find peer")
continue
}
}
state, ok := r.leaderState.replState[*id]
if !ok {
doneCh <- fmt.Errorf("cannot find replication state for %v", id)
continue
}
go r.leadershipTransfer(*id, *address, state, stopCh, doneCh)
case <-r.leaderState.commitCh:
// Process the newly committed entries
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
// New configration has been committed, set it as the committed
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
}
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
// Process the group
if len(groupReady) != 0 {
r.processLogs(lastIdxInGroup, groupFutures)
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown {
if r.conf.ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
r.logger.Info("removed ourself, transitioning to follower")
r.setState(Follower)
}
}
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
case future := <-r.userRestoreCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
err := r.restoreUserSnapshot(future.meta, future.reader)
future.respond(err)
case future := <-r.configurationsCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
future.configurations = r.configurations.Clone()
future.respond(nil)
case future := <-r.configurationChangeChIfStable():
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
r.appendConfigurationEntry(future)
case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)
case newLog := <-r.applyCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
case <-lease:
// Check if we've exceeded the lease, potentially stepping down
maxDiff := r.checkLeaderLease()
// Next check interval should adjust for the last node we've
// contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval {
checkInterval = minCheckInterval
}
// Renew the lease timer
lease = time.After(checkInterval)
case <-r.shutdownCh:
return
}
}
}
// verifyLeader must be called from the main thread for safety.
// Causes the followers to attempt an immediate heartbeat.
func (r *Raft) verifyLeader(v *verifyFuture) {
// Current leader always votes for self
v.votes = 1
// Set the quorum size, hot-path for single node
v.quorumSize = r.quorumSize()
if v.quorumSize == 1 {
v.respond(nil)
return
}
// Track this request
v.notifyCh = r.verifyCh
r.leaderState.notify[v] = struct{}{}
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
asyncNotifyCh(repl.notifyCh)
}
}
// leadershipTransfer is doing the heavy lifting for the leadership transfer.
func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) {
// make sure we are not already stopped
select {
case <-stopCh:
doneCh <- nil
return
default:
}
// Step 1: set this field which stops this leader from responding to any client requests.
r.setLeadershipTransferInProgress(true)
defer func() { r.setLeadershipTransferInProgress(false) }()
for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() {
err := &deferError{}
err.init()
repl.triggerDeferErrorCh <- err
select {
case err := <-err.errCh:
if err != nil {
doneCh <- err
return
}
case <-stopCh:
doneCh <- nil
return
}
}
// Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce
// messaging for read-only queries. If this is implemented, the lease
// has to be reset as well, in case leadership is transferred. This
// implementation also has a lease, but it serves another purpose and
// doesn't need to be reset. The lease mechanism in our raft lib, is
// setup in a similar way to the one in the thesis, but in practice
// it's a timer that just tells the leader how often to check
// heartbeats are still coming in.
// Step 3: send TimeoutNow message to target server.
err := r.trans.TimeoutNow(id, address, &TimeoutNowRequest{RPCHeader: r.getRPCHeader()}, &TimeoutNowResponse{})
if err != nil {
err = fmt.Errorf("failed to make TimeoutNow RPC to %v: %v", id, err)
}
doneCh <- err
}
// checkLeaderLease is used to check if we can contact a quorum of nodes
// within the last leader lease interval. If not, we need to step down,
// as we may have lost connectivity. Returns the maximum duration without
// contact. This must only be called from the main thread.
func (r *Raft) checkLeaderLease() time.Duration {
// Track contacted nodes, we can always contact ourself
contacted := 0
// Check each follower
var maxDiff time.Duration
now := time.Now()
for _, server := range r.configurations.latest.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
contacted++
continue
}
f := r.leaderState.replState[server.ID]
diff := now.Sub(f.LastContact())
if diff <= r.conf.LeaderLeaseTimeout {
contacted++
if diff > maxDiff {
maxDiff = diff
}
} else {
// Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout {
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else {
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
}
}
metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond))
}
}
// Verify we can contact a quorum
quorum := r.quorumSize()
if contacted < quorum {
r.logger.Warn("failed to contact quorum of nodes, stepping down")
r.setState(Follower)
metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1)
}
return maxDiff
}
// quorumSize is used to return the quorum size. This must only be called on
// the main thread.
// TODO: revisit usage
func (r *Raft) quorumSize() int {
voters := 0
for _, server := range r.configurations.latest.Servers {
if server.Suffrage == Voter {
voters++
}
}
return voters/2 + 1
}
// restoreUserSnapshot is used to manually consume an external snapshot, such
// as if restoring from a backup. We will use the current Raft configuration,
// not the one from the snapshot, so that we can restore into a new cluster. We
// will also use the higher of the index of the snapshot, or the current index,
// and then add 1 to that, so we force a new state with a hole in the Raft log,
// so that the snapshot will be sent to followers and used for any new joiners.
// This can only be run on the leader, and returns a future that can be used to
// block until complete.
func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now())
// Sanity check the version.
version := meta.Version
if version < SnapshotVersionMin || version > SnapshotVersionMax {
return fmt.Errorf("unsupported snapshot version %d", version)
}
// We don't support snapshots while there's a config change
// outstanding since the snapshot doesn't have a means to
// represent this state.
committedIndex := r.configurations.committedIndex
latestIndex := r.configurations.latestIndex
if committedIndex != latestIndex {
return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
latestIndex, committedIndex)
}
// Cancel any inflight requests.
for {
e := r.leaderState.inflight.Front()
if e == nil {
break
}
e.Value.(*logFuture).respond(ErrAbortedByRestore)
r.leaderState.inflight.Remove(e)
}
// We will overwrite the snapshot metadata with the current term,
// an index that's greater than the current index, or the last
// index in the snapshot. It's important that we leave a hole in
// the index so we know there's nothing in the Raft log there and
// replication will fault and send the snapshot.
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
if meta.Index > lastIndex {
lastIndex = meta.Index
}
lastIndex++
// Dump the snapshot. Note that we use the latest configuration,
// not the one that came with the snapshot.
sink, err := r.snapshots.Create(version, lastIndex, term,
r.configurations.latest, r.configurations.latestIndex, r.trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
n, err := io.Copy(sink, reader)
if err != nil {
sink.Cancel()
return fmt.Errorf("failed to write snapshot: %v", err)
}
if n != meta.Size {
sink.Cancel()
return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size)
}
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err)
}
r.logger.Info("copied to local snapshot", "bytes", n)
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
fsm := &restoreFuture{ID: sink.ID()}
fsm.ShutdownCh = r.shutdownCh
fsm.init()
select {
case r.fsmMutateCh <- fsm:
case <-r.shutdownCh:
return ErrRaftShutdown
}
if err := fsm.Error(); err != nil {
panic(fmt.Errorf("failed to restore snapshot: %v", err))
}
// We set the last log so it looks like we've stored the empty
// index we burned. The last applied is set because we made the
// FSM take the snapshot state, and we store the last snapshot
// in the stable store since we created a snapshot as part of
// this process.
r.setLastLog(lastIndex, term)
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)
r.logger.Info("restored user snapshot", "index", latestIndex)
return nil
}
// appendConfigurationEntry changes the configuration and adds a new
// configuration entry to the log. This must only be called from the
// main thread.
func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req)
if err != nil {
future.respond(err)
return
}
r.logger.Info("updating configuration",
"command", future.req.command,
"server-id", future.req.serverID,
"server-addr", future.req.serverAddress,
"servers", hclog.Fmt("%+v", configuration.Servers))
// In pre-ID compatibility mode we translate all configuration changes
// in to an old remove peer message, which can handle all supported
// cases for peer changes in the pre-ID world (adding and removing
// voters). Both add peer and remove peer log entries are handled
// similarly on old Raft servers, but remove peer does extra checks to
// see if a leader needs to step down. Since they both assert the full
// configuration, then we can safely call remove peer for everything.
if r.protocolVersion < 2 {
future.log = Log{
Type: LogRemovePeerDeprecated,
Data: encodePeers(configuration, r.trans),
}
} else {
future.log = Log{
Type: LogConfiguration,
Data: EncodeConfiguration(configuration),
}
}
r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
r.setState(Follower)
return
}
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
// processLogs is used to apply all the committed entries that haven't been
// applied up to the given index limit.
// This can be called from both leaders and followers.
// Followers call this from AppendEntries, for n entries at a time, and always
// pass futures=nil.
// Leaders call this when entries are committed. They pass the futures from any
// inflight logs.
func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
preparedLog = r.prepareLog(l, nil)
}
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}
// processLog is invoked to process the application of a single committed log entry.
func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple {
switch l.Type {
case LogBarrier:
// Barrier is handled by the FSM
fallthrough
case LogCommand:
return &commitTuple{l, future}
case LogConfiguration:
// Only support this with the v2 configuration format
if r.protocolVersion > 2 {
return &commitTuple{l, future}
}
case LogAddPeerDeprecated:
case LogRemovePeerDeprecated:
case LogNoop:
// Ignore the no-op
default:
panic(fmt.Errorf("unrecognized log type: %#v", l))
}
return nil
}
// processRPC is called to handle an incoming RPC request. This must only be
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
if err := r.checkRPCHeader(rpc); err != nil {
rpc.Respond(nil, err)
return
}
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
case *RequestVoteRequest:
r.requestVote(rpc, cmd)
case *InstallSnapshotRequest:
r.installSnapshot(rpc, cmd)
case *TimeoutNowRequest:
r.timeoutNow(rpc, cmd)
default:
r.logger.Error("got unexpected command",
"command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
// processHeartbeat is a special handler used just for heartbeat requests
// so that they can be fast-pathed if a transport supports it. This must only
// be called from the main thread.
func (r *Raft) processHeartbeat(rpc RPC) {
defer metrics.MeasureSince([]string{"raft", "rpc", "processHeartbeat"}, time.Now())
// Check if we are shutdown, just ignore the RPC
select {
case <-r.shutdownCh:
return
default:
}
// Ensure we are only handling a heartbeat
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
default:
r.logger.Error("expected heartbeat, got", "command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
// appendEntries is invoked when we get an append entries RPC call. This must
// only be called from the main thread.
func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
// Setup a response
resp := &AppendEntriesResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
LastLog: r.getLastIndex(),
Success: false,
NoRetryBackoff: false,
}
var rpcErr error
defer func() {
rpc.Respond(resp, rpcErr)
}()
// Ignore an older term
if a.Term < r.getCurrentTerm() {
return
}
// Increase the term if we see a newer one, also transition to follower
// if we ever get an appendEntries call
if a.Term > r.getCurrentTerm() || r.getState() != Follower {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(a.Term)
resp.Term = a.Term
}
// Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))
// Verify the last log entry
if a.PrevLogEntry > 0 {
lastIdx, lastTerm := r.getLastEntry()
var prevLogTerm uint64
if a.PrevLogEntry == lastIdx {
prevLogTerm = lastTerm
} else {
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn("failed to get previous log",
"previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true
return
}
prevLogTerm = prevLog.Term
}
if a.PrevLogTerm != prevLogTerm {
r.logger.Warn("previous log term mis-match",
"ours", prevLogTerm,
"remote", a.PrevLogTerm)
resp.NoRetryBackoff = true
return
}
}
// Process any new entries
if len(a.Entries) > 0 {
start := time.Now()
// Delete any conflicting entries, skip any duplicates
lastLogIdx, _ := r.getLastLog()
var newEntries []*Log
for i, entry := range a.Entries {
if entry.Index > lastLogIdx {
newEntries = a.Entries[i:]
break
}
var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn("failed to get log entry",
"index", entry.Index,
"error", err)
return
}
if entry.Term != storeEntry.Term {
r.logger.Warn("clearing log suffix",
"from", entry.Index,
"to", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error("failed to clear log suffix", "error", err)
return
}
if entry.Index <= r.configurations.latestIndex {
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
}
}
if n := len(newEntries); n > 0 {
// Append the new entries
if err := r.logs.StoreLogs(newEntries); err != nil {
r.logger.Error("failed to append to logs", "error", err)
// TODO: leaving r.getLastLog() in the wrong
// state if there was a truncation above
return
}
// Handle any new configuration changes
for _, newEntry := range newEntries {
r.processConfigurationLogEntry(newEntry)
}
// Update the lastLog
last := newEntries[n-1]
r.setLastLog(last.Index, last.Term)
}
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start)
}
// Update the commit index
if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() {
start := time.Now()
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
}
// Everything went well, set success
resp.Success = true
r.setLastContact()
return
}
// processConfigurationLogEntry takes a log entry and updates the latest
// configuration if the entry results in a new configuration. This must only be
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}
// requestVote is invoked when we get an request vote RPC call.
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
r.observe(*req)
// Setup a response
resp := &RequestVoteResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Granted: false,
}
var rpcErr error
defer func() {
rpc.Respond(resp, rpcErr)
}()
// Version 0 servers will panic unless the peers is present. It's only
// used on them to produce a warning message.
if r.protocolVersion < 2 {
resp.Peers = encodePeers(r.configurations.latest, r.trans)
}
// Check if we have an existing leader [who's not the candidate] and also
// check the LeadershipTransfer flag is set. Usually votes are rejected if
// there is a known leader. But if the leader initiated a leadership transfer,
// vote!
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn("rejecting vote request since we have a leader",
"from", candidate,
"leader", leader)
return
}
// Ignore an older term
if req.Term < r.getCurrentTerm() {
return
}
// Increase the term if we see a newer one
if req.Term > r.getCurrentTerm() {
// Ensure transition to follower
r.logger.Debug("lost leadership because received a requestVote with a newer term")
r.setState(Follower)
r.setCurrentTerm(req.Term)
resp.Term = req.Term
}
// Check if we have voted yet
lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)
if err != nil && err.Error() != "not found" {
r.logger.Error("failed to get last vote term", "error", err)
return
}
lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)
if err != nil && err.Error() != "not found" {
r.logger.Error("failed to get last vote candidate", "error", err)
return
}
// Check if we've voted in this election before
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn("duplicate requestVote from", "candidate", candidate)
resp.Granted = true
}
return
}
// Reject if their term is older
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
r.logger.Warn("rejecting vote request since our last term is greater",
"candidate", candidate,
"last-term", lastTerm,
"last-candidate-term", req.LastLogTerm)
return
}
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Warn("rejecting vote request since our last index is greater",
"candidate", candidate,
"last-index", lastIdx,
"last-candidate-index", req.LastLogIndex)
return
}
// Persist a vote for safety
if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error("failed to persist vote", "error", err)
return
}
resp.Granted = true
r.setLastContact()
return
}
// installSnapshot is invoked when we get a InstallSnapshot RPC call.
// We must be in the follower state for this, since it means we are
// too far behind a leader for log replay. This must only be called
// from the main thread.
func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "installSnapshot"}, time.Now())
// Setup a response
resp := &InstallSnapshotResponse{
Term: r.getCurrentTerm(),
Success: false,
}
var rpcErr error
defer func() {
io.Copy(ioutil.Discard, rpc.Reader) // ensure we always consume all the snapshot data from the stream [see issue #212]
rpc.Respond(resp, rpcErr)
}()
// Sanity check the version
if req.SnapshotVersion < SnapshotVersionMin ||
req.SnapshotVersion > SnapshotVersionMax {
rpcErr = fmt.Errorf("unsupported snapshot version %d", req.SnapshotVersion)
return
}
// Ignore an older term
if req.Term < r.getCurrentTerm() {
r.logger.Info("ignoring installSnapshot request with older term than current term",
"request-term", req.Term,
"current-term", r.getCurrentTerm())
return
}
// Increase the term if we see a newer one
if req.Term > r.getCurrentTerm() {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(req.Term)
resp.Term = req.Term
}
// Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(req.Leader)))
// Create a new snapshot
var reqConfiguration Configuration
var reqConfigurationIndex uint64
if req.SnapshotVersion > 0 {
reqConfiguration = DecodeConfiguration(req.Configuration)
reqConfigurationIndex = req.ConfigurationIndex
} else {
reqConfiguration = decodePeers(req.Peers, r.trans)
reqConfigurationIndex = req.LastLogIndex
}
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm,
reqConfiguration, reqConfigurationIndex, r.trans)
if err != nil {
r.logger.Error("failed to create snapshot to install", "error", err)
rpcErr = fmt.Errorf("failed to create snapshot: %v", err)
return
}
// Spill the remote snapshot to disk
n, err := io.Copy(sink, rpc.Reader)
if err != nil {
sink.Cancel()
r.logger.Error("failed to copy snapshot", "error", err)
rpcErr = err
return
}
// Check that we received it all
if n != req.Size {
sink.Cancel()
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
rpcErr = fmt.Errorf("short read")
return
}
// Finalize the snapshot
if err := sink.Close(); err != nil {
r.logger.Error("failed to finalize snapshot", "error", err)
rpcErr = err
return
}
r.logger.Info("copied to local snapshot", "bytes", n)
// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
future.ShutdownCh = r.shutdownCh
future.init()
select {
case r.fsmMutateCh <- future:
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
return
}
// Wait for the restore to happen
if err := future.Error(); err != nil {
r.logger.Error("failed to restore snapshot", "error", err)
rpcErr = err
return
}
// Update the lastApplied so we don't replay old logs
r.setLastApplied(req.LastLogIndex)
// Update the last stable snapshot info
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)
// Restore the peer set
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)
// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
r.logger.Error("failed to compact logs", "error", err)
}
r.logger.Info("Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}
// setLastContact is used to set the last contact time to now
func (r *Raft) setLastContact() {
r.lastContactLock.Lock()
r.lastContact = time.Now()
r.lastContactLock.Unlock()
}
type voteResult struct {
RequestVoteResponse
voterID ServerID
}
// electSelf is used to send a RequestVote RPC to all peers, and vote for
// ourself. This has the side affecting of incrementing the current term. The
// response channel returned is used to wait for all the responses (including a
// vote for ourself). This must only be called from the main thread.
func (r *Raft) electSelf() <-chan *voteResult {
// Create a response channel
respCh := make(chan *voteResult, len(r.configurations.latest.Servers))
// Increment the term
r.setCurrentTerm(r.getCurrentTerm() + 1)
// Construct the request
lastIdx, lastTerm := r.getLastEntry()
req := &RequestVoteRequest{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
LeadershipTransfer: r.candidateFromLeadershipTransfer,
}
// Construct a function to ask for a vote
askPeer := func(peer Server) {
r.goFunc(func() {
defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now())
resp := &voteResult{voterID: peer.ID}
err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse)
if err != nil {
r.logger.Error("failed to make requestVote RPC",
"target", peer,
"error", err)
resp.Term = req.Term
resp.Granted = false
}
respCh <- resp
})
}
// For each peer, request a vote
for _, server := range r.configurations.latest.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
// Persist a vote for ourselves
if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error("failed to persist vote", "error", err)
return nil
}
// Include our own vote
respCh <- &voteResult{
RequestVoteResponse: RequestVoteResponse{
RPCHeader: r.getRPCHeader(),
Term: req.Term,
Granted: true,
},
voterID: r.localID,
}
} else {
askPeer(server)
}
}
}
return respCh
}
// persistVote is used to persist our vote for safety.
func (r *Raft) persistVote(term uint64, candidate []byte) error {
if err := r.stable.SetUint64(keyLastVoteTerm, term); err != nil {
return err
}
if err := r.stable.Set(keyLastVoteCand, candidate); err != nil {
return err
}
return nil
}
// setCurrentTerm is used to set the current term in a durable manner.
func (r *Raft) setCurrentTerm(t uint64) {
// Persist to disk first
if err := r.stable.SetUint64(keyCurrentTerm, t); err != nil {
panic(fmt.Errorf("failed to save current term: %v", err))
}
r.raftState.setCurrentTerm(t)
}
// setState is used to update the current state. Any state
// transition causes the known leader to be cleared. This means
// that leader should be set only after updating the state.
func (r *Raft) setState(state RaftState) {
r.setLeader("")
oldState := r.raftState.getState()
r.raftState.setState(state)
if oldState != state {
r.observe(state)
}
}
// LookupServer looks up a server by ServerID.
func (r *Raft) lookupServer(id ServerID) *Server {
for _, server := range r.configurations.latest.Servers {
if server.ID != r.localID {
return &server
}
}
return nil
}
// pickServer returns the follower that is most up to date and participating in quorum.
// Because it accesses leaderstate, it should only be called from the leaderloop.
func (r *Raft) pickServer() *Server {
var pick *Server
var current uint64
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID || server.Suffrage != Voter {
continue
}
state, ok := r.leaderState.replState[server.ID]
if !ok {
continue
}
nextIdx := atomic.LoadUint64(&state.nextIndex)
if nextIdx > current {
current = nextIdx
tmp := server
pick = &tmp
}
}
return pick
}
// initiateLeadershipTransfer starts the leadership on the leader side, by
// sending a message to the leadershipTransferCh, to make sure it runs in the
// mainloop.
func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress) LeadershipTransferFuture {
future := &leadershipTransferFuture{ID: id, Address: address}
future.init()
if id != nil && *id == r.localID {
err := fmt.Errorf("cannot transfer leadership to itself")
r.logger.Info(err.Error())
future.respond(err)
return future
}
select {
case r.leadershipTransferCh <- future:
return future
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
default:
return errorFuture{ErrEnqueueTimeout}
}
}
// timeoutNow is what happens when a server receives a TimeoutNowRequest.
func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.setLeader("")
r.setState(Candidate)
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}
// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfiguration.Store(c.Clone())
}
// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}
// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
// this switch catches the case where this is called without having set
// a configuration previously.
switch c := r.latestConfiguration.Load().(type) {
case Configuration:
return c
default:
return Configuration{}
}
}