2020-10-08 15:07:10 -04:00

610 lines
19 KiB
Go

package raft
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
)
const (
maxFailureScale = 12
failureWait = 10 * time.Millisecond
)
var (
// ErrLogNotFound indicates a given log entry is not available.
ErrLogNotFound = errors.New("log not found")
// ErrPipelineReplicationNotSupported can be returned by the transport to
// signal that pipeline replication is not supported in general, and that
// no error message should be produced.
ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
)
// followerReplication is in charge of sending snapshots and log entries from
// this leader during this particular term to a remote follower.
type followerReplication struct {
// currentTerm and nextIndex must be kept at the top of the struct so
// they're 64 bit aligned which is a requirement for atomic ops on 32 bit
// platforms.
// currentTerm is the term of this leader, to be included in AppendEntries
// requests.
currentTerm uint64
// nextIndex is the index of the next log entry to send to the follower,
// which may fall past the end of the log.
nextIndex uint64
// peer contains the network address and ID of the remote follower.
peer Server
// commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successful
// AppendEntries responses.
commitment *commitment
// stopCh is notified/closed when this leader steps down or the follower is
// removed from the cluster. In the follower removed case, it carries a log
// index; replication should be attempted with a best effort up through that
// index, before exiting.
stopCh chan uint64
// triggerCh is notified every time new entries are appended to the log.
triggerCh chan struct{}
// triggerDeferErrorCh is used to provide a backchannel. By sending a
// deferErr, the sender can be notifed when the replication is done.
triggerDeferErrorCh chan *deferError
// lastContact is updated to the current time whenever any response is
// received from the follower (successful or not). This is used to check
// whether the leader should step down (Raft.checkLeaderLease()).
lastContact time.Time
// lastContactLock protects 'lastContact'.
lastContactLock sync.RWMutex
// failures counts the number of failed RPCs since the last success, which is
// used to apply backoff.
failures uint64
// notifyCh is notified to send out a heartbeat, which is used to check that
// this server is still leader.
notifyCh chan struct{}
// notify is a map of futures to be resolved upon receipt of an
// acknowledgement, then cleared from this map.
notify map[*verifyFuture]struct{}
// notifyLock protects 'notify'.
notifyLock sync.Mutex
// stepDown is used to indicate to the leader that we
// should step down based on information from a follower.
stepDown chan struct{}
// allowPipeline is used to determine when to pipeline the AppendEntries RPCs.
// It is private to this replication goroutine.
allowPipeline bool
}
// notifyAll is used to notify all the waiting verify futures
// if the follower believes we are still the leader.
func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time
s.notifyLock.Lock()
n := s.notify
s.notify = make(map[*verifyFuture]struct{})
s.notifyLock.Unlock()
// Submit our votes
for v := range n {
v.vote(leader)
}
}
// cleanNotify is used to delete notify, .
func (s *followerReplication) cleanNotify(v *verifyFuture) {
s.notifyLock.Lock()
delete(s.notify, v)
s.notifyLock.Unlock()
}
// LastContact returns the time of last contact.
func (s *followerReplication) LastContact() time.Time {
s.lastContactLock.RLock()
last := s.lastContact
s.lastContactLock.RUnlock()
return last
}
// setLastContact sets the last contact to the current time.
func (s *followerReplication) setLastContact() {
s.lastContactLock.Lock()
s.lastContact = time.Now()
s.lastContactLock.Unlock()
}
// replicate is a long running routine that replicates log entries to a single
// follower.
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
defer close(stopHeartbeat)
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
RPC:
shouldStop := false
for !shouldStop {
select {
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.replicateTo(s, maxIndex)
}
return
case deferErr := <-s.triggerDeferErrorCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
if !shouldStop {
deferErr.respond(nil)
} else {
deferErr.respond(fmt.Errorf("replication failed"))
}
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
// This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
// If things looks healthy, switch to pipeline mode
if !shouldStop && s.allowPipeline {
goto PIPELINE
}
}
return
PIPELINE:
// Disable until re-enabled
s.allowPipeline = false
// Replicates using a pipeline for high performance. This method
// is not able to gracefully recover from errors, and so we fall back
// to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported {
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
}
}
goto RPC
}
// replicateTo is a helper to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var req AppendEntriesRequest
var resp AppendEntriesResponse
var start time.Time
START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
select {
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
case <-r.shutdownCh:
}
}
// Setup the request
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
goto SEND_SNAP
} else if err != nil {
return
}
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
// Update our replication state
updateLastAppended(s, &req)
// Clear any failures, allow pipelining
s.failures = 0
s.allowPipeline = true
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
CHECK_MORE:
// Poll the stop channel here in case we are looping and have been asked
// to stop, or have stepped down as leader. Even for the best effort case
// where we are asked to replicate to a given index and then shutdown,
// it's better to not loop in here to send lots of entries to a straggler
// that's leaving the cluster anyways.
select {
case <-s.stopCh:
return true
default:
}
// Check if there are more logs to replicate
if atomic.LoadUint64(&s.nextIndex) <= lastIndex {
goto START
}
return
// SEND_SNAP is used when we fail to get a log, usually because the follower
// is too far behind, and we must ship a snapshot down instead
SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return
}
// Check if there is more to replicate
goto CHECK_MORE
}
// sendLatestSnapshot is used to send the latest snapshot we have
// down to our follower.
func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Get the snapshots
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Error("failed to list snapshots", "error", err)
return false, err
}
// Check we have at least a single snapshot
if len(snapshots) == 0 {
return false, fmt.Errorf("no snapshots found")
}
// Open the most recent snapshot
snapID := snapshots[0].ID
meta, snapshot, err := r.snapshots.Open(snapID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapID, "error", err)
return false, err
}
defer snapshot.Close()
// Setup the request
req := InstallSnapshotRequest{
RPCHeader: r.getRPCHeader(),
SnapshotVersion: meta.Version,
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: meta.Index,
LastLogTerm: meta.Term,
Peers: meta.Peers,
Size: meta.Size,
Configuration: EncodeConfiguration(meta.Configuration),
ConfigurationIndex: meta.ConfigurationIndex,
}
// Make the call
start := time.Now()
var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Error("failed to install snapshot", "id", snapID, "error", err)
s.failures++
return false, err
}
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true, nil
}
// Update the last contact
s.setLastContact()
// Check for success
if resp.Success {
// Update the indexes
atomic.StoreUint64(&s.nextIndex, meta.Index+1)
s.commitment.match(s.peer.ID, meta.Index)
// Clear any failures
s.failures = 0
// Notify we are still leader
s.notifyAll(true)
} else {
s.failures++
r.logger.Warn("installSnapshot rejected to", "peer", s.peer)
}
return false, nil
}
// heartbeat is used to periodically invoke AppendEntries on a peer
// to ensure they don't time out. This is done async of replicate(),
// since that routine could potentially be blocked on disk IO.
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
}
var resp AppendEntriesResponse
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
case <-stopCh:
return
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-stopCh:
}
} else {
s.setLastContact()
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
}
// pipelineReplicate is used when we have synchronized our state with the follower,
// and want to switch to a higher performance pipeline mode of replication.
// We only pipeline AppendEntries commands, and if we ever hit an error, we fall
// back to the standard replication which can handle more complex situations.
func (r *Raft) pipelineReplicate(s *followerReplication) error {
// Create a new pipeline
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address)
if err != nil {
return err
}
defer pipeline.Close()
// Log start and stop of pipeline
r.logger.Info("pipelining replication", "peer", s.peer)
defer r.logger.Info("aborting pipeline replication", "peer", s.peer)
// Create a shutdown and finish channel
stopCh := make(chan struct{})
finishCh := make(chan struct{})
// Start a dedicated decoder
r.goFunc(func() { r.pipelineDecode(s, pipeline, stopCh, finishCh) })
// Start pipeline sends at the last good nextIndex
nextIndex := atomic.LoadUint64(&s.nextIndex)
shouldStop := false
SEND:
for !shouldStop {
select {
case <-finishCh:
break SEND
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.pipelineSend(s, pipeline, &nextIndex, maxIndex)
}
break SEND
case deferErr := <-s.triggerDeferErrorCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
if !shouldStop {
deferErr.respond(nil)
} else {
deferErr.respond(fmt.Errorf("replication failed"))
}
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
}
}
// Stop our decoder, and wait for it to finish
close(stopCh)
select {
case <-finishCh:
case <-r.shutdownCh:
}
return nil
}
// pipelineSend is used to send data over a pipeline. It is a helper to
// pipelineReplicate.
func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *uint64, lastIndex uint64) (shouldStop bool) {
// Create a new append request
req := new(AppendEntriesRequest)
if err := r.setupAppendEntries(s, req, *nextIdx, lastIndex); err != nil {
return true
}
// Pipeline the append entries
if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil {
r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err)
return true
}
// Increase the next send log to avoid re-sending old logs
if n := len(req.Entries); n > 0 {
last := req.Entries[n-1]
atomic.StoreUint64(nextIdx, last.Index+1)
}
return false
}
// pipelineDecode is used to decode the responses of pipelined requests.
func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, finishCh chan struct{}) {
defer close(finishCh)
respCh := p.Consumer()
for {
select {
case ready := <-respCh:
req, resp := ready.Request(), ready.Response()
appendStats(string(s.peer.ID), ready.Start(), float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return
}
// Update the last contact
s.setLastContact()
// Abort pipeline if not successful
if !resp.Success {
return
}
// Update our replication state
updateLastAppended(s, req)
case <-stopCh:
return
}
}
}
// setupAppendEntries is used to setup an append entries request.
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
req.RPCHeader = r.getRPCHeader()
req.Term = s.currentTerm
req.Leader = r.trans.EncodePeer(r.localID, r.localAddr)
req.LeaderCommitIndex = r.getCommitIndex()
if err := r.setPreviousLog(req, nextIndex); err != nil {
return err
}
if err := r.setNewLogs(req, nextIndex, lastIndex); err != nil {
return err
}
return nil
}
// setPreviousLog is used to setup the PrevLogEntry and PrevLogTerm for an
// AppendEntriesRequest given the next index to replicate.
func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error {
// Guard for the first index, since there is no 0 log entry
// Guard against the previous index being a snapshot as well
lastSnapIdx, lastSnapTerm := r.getLastSnapshot()
if nextIndex == 1 {
req.PrevLogEntry = 0
req.PrevLogTerm = 0
} else if (nextIndex - 1) == lastSnapIdx {
req.PrevLogEntry = lastSnapIdx
req.PrevLogTerm = lastSnapTerm
} else {
var l Log
if err := r.logs.GetLog(nextIndex-1, &l); err != nil {
r.logger.Error("failed to get log", "index", nextIndex-1, "error", err)
return err
}
// Set the previous index and term (0 if nextIndex is 1)
req.PrevLogEntry = l.Index
req.PrevLogTerm = l.Term
}
return nil
}
// setNewLogs is used to setup the logs which should be appended for a request.
func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
// Append up to MaxAppendEntries or up to the lastIndex
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries)
maxIndex := min(nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex)
for i := nextIndex; i <= maxIndex; i++ {
oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil {
r.logger.Error("failed to get log", "index", i, "error", err)
return err
}
req.Entries = append(req.Entries, oldLog)
}
return nil
}
// appendStats is used to emit stats about an AppendEntries invocation.
func appendStats(peer string, start time.Time, logs float32) {
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
}
// handleStaleTerm is used when a follower indicates that we have a stale term.
func (r *Raft) handleStaleTerm(s *followerReplication) {
r.logger.Error("peer has newer term, stopping replication", "peer", s.peer)
s.notifyAll(false) // No longer leader
asyncNotifyCh(s.stepDown)
}
// updateLastAppended is used to update follower replication state after a
// successful AppendEntries RPC.
// TODO: This isn't used during InstallSnapshot, but the code there is similar.
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
// Mark any inflight logs as committed
if logs := req.Entries; len(logs) > 0 {
last := logs[len(logs)-1]
atomic.StoreUint64(&s.nextIndex, last.Index+1)
s.commitment.match(s.peer.ID, last.Index)
}
// Notify still leader
s.notifyAll(true)
}