1009 lines
33 KiB
Go

package raft
import (
"errors"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/armon/go-metrics"
)
var (
// ErrLeader is returned when an operation can't be completed on a
// leader node.
ErrLeader = errors.New("node is the leader")
// ErrNotLeader is returned when an operation can't be completed on a
// follower or candidate node.
ErrNotLeader = errors.New("node is not the leader")
// ErrLeadershipLost is returned when a leader fails to commit a log entry
// because it's been deposed in the process.
ErrLeadershipLost = errors.New("leadership lost while committing log")
// ErrAbortedByRestore is returned when a leader fails to commit a log
// entry because it's been superseded by a user snapshot restore.
ErrAbortedByRestore = errors.New("snapshot restored while committing log")
// ErrRaftShutdown is returned when operations are requested against an
// inactive Raft.
ErrRaftShutdown = errors.New("raft is already shutdown")
// ErrEnqueueTimeout is returned when a command fails due to a timeout.
ErrEnqueueTimeout = errors.New("timed out enqueuing operation")
// ErrNothingNewToSnapshot is returned when trying to create a snapshot
// but there's nothing new commited to the FSM since we started.
ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")
// ErrUnsupportedProtocol is returned when an operation is attempted
// that's not supported by the current protocol version.
ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version")
// ErrCantBootstrap is returned when attempt is made to bootstrap a
// cluster that already has state present.
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")
)
// Raft implements a Raft node.
type Raft struct {
raftState
// protocolVersion is used to inter-operate with Raft servers running
// different versions of the library. See comments in config.go for more
// details.
protocolVersion ProtocolVersion
// applyCh is used to async send logs to the main thread to
// be committed and applied to the FSM.
applyCh chan *logFuture
// Configuration provided at Raft initialization
conf Config
// FSM is the client state machine to apply commands to
fsm FSM
// fsmMutateCh is used to send state-changing updates to the FSM. This
// receives pointers to commitTuple structures when applying logs or
// pointers to restoreFuture structures when restoring a snapshot. We
// need control over the order of these operations when doing user
// restores so that we finish applying any old log applies before we
// take a user snapshot on the leader, otherwise we might restore the
// snapshot and apply old logs to it that were in the pipe.
fsmMutateCh chan interface{}
// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture
// lastContact is the last time we had contact from the
// leader node. This can be used to gauge staleness.
lastContact time.Time
lastContactLock sync.RWMutex
// Leader is the current cluster leader
leader ServerAddress
leaderLock sync.RWMutex
// leaderCh is used to notify of leadership changes
leaderCh chan bool
// leaderState used only while state is leader
leaderState leaderState
// Stores our local server ID, used to avoid sending RPCs to ourself
localID ServerID
// Stores our local addr
localAddr ServerAddress
// Used for our logging
logger *log.Logger
// LogStore provides durable storage for logs
logs LogStore
// Used to request the leader to make configuration changes.
configurationChangeCh chan *configurationChangeFuture
// Tracks the latest configuration and latest committed configuration from
// the log/snapshot.
configurations configurations
// RPC chan comes from the transport layer
rpcCh <-chan RPC
// Shutdown channel to exit, protected to prevent concurrent exits
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
// snapshots is used to store and retrieve snapshots
snapshots SnapshotStore
// userSnapshotCh is used for user-triggered snapshots
userSnapshotCh chan *userSnapshotFuture
// userRestoreCh is used for user-triggered restores of external
// snapshots
userRestoreCh chan *userRestoreFuture
// stable is a StableStore implementation for durable state
// It provides stable storage for many fields in raftState
stable StableStore
// The transport layer we use
trans Transport
// verifyCh is used to async send verify futures to the main thread
// to verify we are still the leader
verifyCh chan *verifyFuture
// configurationsCh is used to get the configuration data safely from
// outside of the main thread.
configurationsCh chan *configurationsFuture
// bootstrapCh is used to attempt an initial bootstrap from outside of
// the main thread.
bootstrapCh chan *bootstrapFuture
// List of observers and the mutex that protects them. The observers list
// is indexed by an artificial ID which is used for deregistration.
observersLock sync.RWMutex
observers map[uint64]*Observer
}
// BootstrapCluster initializes a server's storage with the given cluster
// configuration. This should only be called at the beginning of time for the
// cluster, and you absolutely must make sure that you call it with the same
// configuration on all the Voter servers. There is no need to bootstrap
// Nonvoter and Staging servers.
//
// One sane approach is to boostrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
}
// Sanity check the Raft peer configuration.
if err := checkConfiguration(configuration); err != nil {
return err
}
// Make sure the cluster is in a clean state.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if hasState {
return ErrCantBootstrap
}
// Set current term to 1.
if err := stable.SetUint64(keyCurrentTerm, 1); err != nil {
return fmt.Errorf("failed to save current term: %v", err)
}
// Append configuration entry to log.
entry := &Log{
Index: 1,
Term: 1,
}
if conf.ProtocolVersion < 3 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err)
}
return nil
}
// RecoverCluster is used to manually force a new configuration in order to
// recover from a loss of quorum where the current configuration cannot be
// restored, such as when several servers die at the same time. This works by
// reading all the current state for this server, creating a snapshot with the
// supplied configuration, and then truncating the Raft log. This is the only
// safe way to force a given configuration without actually altering the log to
// insert any new entries, which could cause conflicts with other servers with
// different state.
//
// WARNING! This operation implicitly commits all entries in the Raft log, so
// in general this is an extremely unsafe operation. If you've lost your other
// servers and are performing a manual recovery, then you've also lost the
// commit information, so this is likely the best you can do, but you should be
// aware that calling this can cause Raft log entries that were in the process
// of being replicated but not yet be committed to be committed.
//
// Note the FSM passed here is used for the snapshot operations and will be
// left in a state that should not be used by the application. Be sure to
// discard this FSM and any associated state and provide a fresh one when
// calling NewRaft later.
//
// A typical way to recover the cluster is to shut down all servers and then
// run RecoverCluster on every server using an identical configuration. When
// the cluster is then restarted, and election should occur and then Raft will
// resume normal operation. If it's desired to make a particular server the
// leader, this can be used to inject a new configuration with that server as
// the sole voter, and then join up other new clean-state peer servers using
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
}
// Sanity check the Raft peer configuration.
if err := checkConfiguration(configuration); err != nil {
return err
}
// Refuse to recover if there's no existing state. This would be safe to
// do, but it is likely an indication of an operator error where they
// expect data to be there and it's not. By refusing, we force them
// to show intent to start a cluster fresh by explicitly doing a
// bootstrap, rather than quietly fire up a fresh cluster here.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if !hasState {
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
}
// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
snapshots, err := snaps.List()
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}
for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
defer source.Close()
if err := fsm.Restore(source); err != nil {
// Same here, skip and try the next one.
continue
}
snapshotIndex = snapshot.Index
snapshotTerm = snapshot.Term
break
}
if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) {
return fmt.Errorf("failed to restore any of the available snapshots")
}
// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
lastIndex := snapshotIndex
lastTerm := snapshotTerm
// Apply any Raft log entries past the snapshot.
lastLogIndex, err := logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to find last log: %v", err)
}
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
var entry Log
if err := logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
_ = fsm.Apply(&entry)
}
lastIndex = entry.Index
lastTerm = entry.Term
}
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
snapshot, err := fsm.Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot FSM: %v", err)
}
version := getSnapshotVersion(conf.ProtocolVersion)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
if err := snapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
}
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
// Compact the log so that we don't get bad interference from any
// configuration change log entries that might be there.
firstLogIndex, err := logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}
return nil
}
// HasExistingState returns true if the server has any existing state (logs,
// knowledge of a current term, or any snapshots).
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) {
// Make sure we don't have a current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err == nil {
if currentTerm > 0 {
return true, nil
}
} else {
if err.Error() != "not found" {
return false, fmt.Errorf("failed to read current term: %v", err)
}
}
// Make sure we have an empty log.
lastIndex, err := logs.LastIndex()
if err != nil {
return false, fmt.Errorf("failed to get last log index: %v", err)
}
if lastIndex > 0 {
return true, nil
}
// Make sure we have no snapshots
snapshots, err := snaps.List()
if err != nil {
return false, fmt.Errorf("failed to list snapshots: %v", err)
}
if len(snapshots) > 0 {
return true, nil
}
return false, nil
}
// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// Validate the configuration.
if err := ValidateConfig(conf); err != nil {
return nil, err
}
// Ensure we have a LogOutput.
var logger *log.Logger
if conf.Logger != nil {
logger = conf.Logger
} else {
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}
logger = log.New(conf.LogOutput, "", log.LstdFlags)
}
// Try to restore the current term.
currentTerm, err := stable.GetUint64(keyCurrentTerm)
if err != nil && err.Error() != "not found" {
return nil, fmt.Errorf("failed to load current term: %v", err)
}
// Read the index of the last log entry.
lastIndex, err := logs.LastIndex()
if err != nil {
return nil, fmt.Errorf("failed to find last log: %v", err)
}
// Get the last log entry.
var lastLog Log
if lastIndex > 0 {
if err = logs.GetLog(lastIndex, &lastLog); err != nil {
return nil, fmt.Errorf("failed to get last log at index %d: %v", lastIndex, err)
}
}
// Make sure we have a valid server address and ID.
protocolVersion := conf.ProtocolVersion
localAddr := ServerAddress(trans.LocalAddr())
localID := conf.LocalID
// TODO (slackpad) - When we deprecate protocol version 2, remove this
// along with the AddPeer() and RemovePeer() APIs.
if protocolVersion < 3 && string(localID) != string(localAddr) {
return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
}
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
snapshots: snaps,
userSnapshotCh: make(chan *userSnapshotFuture),
userRestoreCh: make(chan *userRestoreFuture),
shutdownCh: make(chan struct{}),
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
}
// Initialize as a follower.
r.setState(Follower)
// Start as leader if specified. This should only be used
// for testing purposes.
if conf.StartAsLeader {
r.setState(Leader)
r.setLeader(r.localAddr)
}
// Restore the current term and the last log.
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)
// Attempt to restore a snapshot if there are any.
if err := r.restoreSnapshot(); err != nil {
return nil, err
}
// Scan through the log for any configuration change entries.
snapshotIndex, _ := r.getLastSnapshot()
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
if err := r.logs.GetLog(index, &entry); err != nil {
r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", index, err)
panic(err)
}
r.processConfigurationLogEntry(&entry)
}
r.logger.Printf("[INFO] raft: Initial configuration (index=%d): %+v",
r.configurations.latestIndex, r.configurations.latest.Servers)
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
}
// restoreSnapshot attempts to restore the latest snapshots, and fails if none
// of them can be restored. This is called at initialization time, and is
// completely unsafe to call at any other time.
func (r *Raft) restoreSnapshot() error {
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Printf("[ERR] raft: Failed to list snapshots: %v", err)
return err
}
// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Printf("[ERR] raft: Failed to open snapshot %v: %v", snapshot.ID, err)
continue
}
defer source.Close()
if err := r.fsm.Restore(source); err != nil {
r.logger.Printf("[ERR] raft: Failed to restore snapshot %v: %v", snapshot.ID, err)
continue
}
// Log success
r.logger.Printf("[INFO] raft: Restored from snapshot %v", snapshot.ID)
// Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index)
// Update the last stable snapshot info
r.setLastSnapshot(snapshot.Index, snapshot.Term)
// Update the configuration
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
}
// Success!
return nil
}
// If we had snapshots and failed to load them, its an error
if len(snapshots) > 0 {
return fmt.Errorf("failed to load any existing snapshots")
}
return nil
}
// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster, and you
// absolutely must make sure that you call it with the same configuration on all
// the Voter servers. There is no need to bootstrap Nonvoter and Staging
// servers.
func (r *Raft) BootstrapCluster(configuration Configuration) Future {
bootstrapReq := &bootstrapFuture{}
bootstrapReq.init()
bootstrapReq.configuration = configuration
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.bootstrapCh <- bootstrapReq:
return bootstrapReq
}
}
// Leader is used to return the current leader of the cluster.
// It may return empty string if there is no current leader
// or the leader is unknown.
func (r *Raft) Leader() ServerAddress {
r.leaderLock.RLock()
leader := r.leader
r.leaderLock.RUnlock()
return leader
}
// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: cmd,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
// Barrier is used to issue a command that blocks until all preceeding
// operations have been applied to the FSM. It can be used to ensure the
// FSM reflects all queued writes. An optional timeout can be provided to
// limit the amount of time we wait for the command to be started. This
// must be run on the leader or it will fail.
func (r *Raft) Barrier(timeout time.Duration) Future {
metrics.IncrCounter([]string{"raft", "barrier"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogBarrier,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
// VerifyLeader is used to ensure the current node is still
// the leader. This can be done to prevent stale reads when a
// new leader has potentially been elected.
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
return verifyFuture
}
}
// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
}
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
// run on the leader or it will fail. Use AddVoter/AddNonvoter instead.
func (r *Raft) AddPeer(peer ServerAddress) Future {
if r.protocolVersion > 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: ServerID(peer),
serverAddress: peer,
prevIndex: 0,
}, 0)
}
// RemovePeer (deprecated) is used to remove a peer from the cluster. If the
// current leader is being removed, it will cause a new election
// to occur. This must be run on the leader or it will fail.
// Use RemoveServer instead.
func (r *Raft) RemovePeer(peer ServerAddress) Future {
if r.protocolVersion > 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: ServerID(peer),
prevIndex: 0,
}, 0)
}
// AddVoter will add the given server to the cluster as a staging server. If the
// server is already in the cluster as a voter, this does nothing. This must be
// run on the leader or it will fail. The leader will promote the staging server
// to a voter once that server is ready. If nonzero, prevIndex is the index of
// the only configuration upon which this change may be applied; if another
// configuration entry has been added in the meantime, this request will fail.
// If nonzero, timeout is how long this server should wait before the
// configuration change log entry is appended.
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddStaging,
serverID: id,
serverAddress: address,
prevIndex: prevIndex,
}, timeout)
}
// AddNonvoter will add the given server to the cluster but won't assign it a
// vote. The server will receive log entries, but it won't participate in
// elections or log entry commitment. If the server is already in the cluster as
// a staging server or voter, this does nothing. This must be run on the leader
// or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: AddNonvoter,
serverID: id,
serverAddress: address,
prevIndex: prevIndex,
}, timeout)
}
// RemoveServer will remove the given server from the cluster. If the current
// leader is being removed, it will cause a new election to occur. This must be
// run on the leader or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: RemoveServer,
serverID: id,
prevIndex: prevIndex,
}, timeout)
}
// DemoteVoter will take away a server's vote, if it has one. If present, the
// server will continue to receive log entries, but it won't participate in
// elections or log entry commitment. If the server is not in the cluster, this
// does nothing. This must be run on the leader or it will fail. For prevIndex
// and timeout, see AddVoter.
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}
return r.requestConfigChange(configurationChangeRequest{
command: DemoteVoter,
serverID: id,
prevIndex: prevIndex,
}, timeout)
}
// Shutdown is used to stop the Raft background routines.
// This is not a graceful operation. Provides a future that
// can be used to block until all background routines have exited.
func (r *Raft) Shutdown() Future {
r.shutdownLock.Lock()
defer r.shutdownLock.Unlock()
if !r.shutdown {
close(r.shutdownCh)
r.shutdown = true
r.setState(Shutdown)
return &shutdownFuture{r}
}
// avoid closing transport twice
return &shutdownFuture{nil}
}
// Snapshot is used to manually force Raft to take a snapshot. Returns a future
// that can be used to block until complete, and that contains a function that
// can be used to open the snapshot.
func (r *Raft) Snapshot() SnapshotFuture {
future := &userSnapshotFuture{}
future.init()
select {
case r.userSnapshotCh <- future:
return future
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
return future
}
}
// Restore is used to manually force Raft to consume an external snapshot, such
// as if restoring from a backup. We will use the current Raft configuration,
// not the one from the snapshot, so that we can restore into a new cluster. We
// will also use the higher of the index of the snapshot, or the current index,
// and then add 1 to that, so we force a new state with a hole in the Raft log,
// so that the snapshot will be sent to followers and used for any new joiners.
// This can only be run on the leader, and blocks until the restore is complete
// or an error occurs.
//
// WARNING! This operation has the leader take on the state of the snapshot and
// then sets itself up so that it replicates that to its followers though the
// install snapshot process. This involves a potentially dangerous period where
// the leader commits ahead of its followers, so should only be used for disaster
// recovery into a fresh cluster, and should not be used in normal operations.
func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error {
metrics.IncrCounter([]string{"raft", "restore"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Perform the restore.
restore := &userRestoreFuture{
meta: meta,
reader: reader,
}
restore.init()
select {
case <-timer:
return ErrEnqueueTimeout
case <-r.shutdownCh:
return ErrRaftShutdown
case r.userRestoreCh <- restore:
// If the restore is ingested then wait for it to complete.
if err := restore.Error(); err != nil {
return err
}
}
// Apply a no-op log entry. Waiting for this allows us to wait until the
// followers have gotten the restore and replicated at least this new
// entry, which shows that we've also faulted and installed the
// snapshot with the contents of the restore.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
noop.init()
select {
case <-timer:
return ErrEnqueueTimeout
case <-r.shutdownCh:
return ErrRaftShutdown
case r.applyCh <- noop:
return noop.Error()
}
}
// State is used to return the current raft state.
func (r *Raft) State() RaftState {
return r.getState()
}
// LeaderCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. The channel is not buffered,
// and does not block on writes.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}
// String returns a string representation of this Raft node.
func (r *Raft) String() string {
return fmt.Sprintf("Node at %s [%v]", r.localAddr, r.getState())
}
// LastContact returns the time of last contact by a leader.
// This only makes sense if we are currently a follower.
func (r *Raft) LastContact() time.Time {
r.lastContactLock.RLock()
last := r.lastContact
r.lastContactLock.RUnlock()
return last
}
// Stats is used to return a map of various internal stats. This
// should only be used for informative purposes or debugging.
//
// Keys are: "state", "term", "last_log_index", "last_log_term",
// "commit_index", "applied_index", "fsm_pending",
// "last_snapshot_index", "last_snapshot_term",
// "latest_configuration", "last_contact", and "num_peers".
//
// The value of "state" is a numerical value representing a
// RaftState const.
//
// The value of "latest_configuration" is a string which contains
// the id of each server, its suffrage status, and its address.
//
// The value of "last_contact" is either "never" if there
// has been no contact with a leader, "0" if the node is in the
// leader state, or the time since last contact with a leader
// formatted as a string.
//
// The value of "num_peers" is the number of other voting servers in the
// cluster, not including this node. If this node isn't part of the
// configuration then this will be "0".
//
// All other values are uint64s, formatted as strings.
func (r *Raft) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
lastLogIndex, lastLogTerm := r.getLastLog()
lastSnapIndex, lastSnapTerm := r.getLastSnapshot()
s := map[string]string{
"state": r.getState().String(),
"term": toString(r.getCurrentTerm()),
"last_log_index": toString(lastLogIndex),
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmMutateCh))),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"protocol_version": toString(uint64(r.protocolVersion)),
"protocol_version_min": toString(uint64(ProtocolVersionMin)),
"protocol_version_max": toString(uint64(ProtocolVersionMax)),
"snapshot_version_min": toString(uint64(SnapshotVersionMin)),
"snapshot_version_max": toString(uint64(SnapshotVersionMax)),
}
future := r.GetConfiguration()
if err := future.Error(); err != nil {
r.logger.Printf("[WARN] raft: could not get configuration for Stats: %v", err)
} else {
configuration := future.Configuration()
s["latest_configuration_index"] = toString(future.Index())
s["latest_configuration"] = fmt.Sprintf("%+v", configuration.Servers)
// This is a legacy metric that we've seen people use in the wild.
hasUs := false
numPeers := 0
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
hasUs = true
} else {
numPeers++
}
}
}
if !hasUs {
numPeers = 0
}
s["num_peers"] = toString(uint64(numPeers))
}
last := r.LastContact()
if r.getState() == Leader {
s["last_contact"] = "0"
} else if last.IsZero() {
s["last_contact"] = "never"
} else {
s["last_contact"] = fmt.Sprintf("%v", time.Now().Sub(last))
}
return s
}
// LastIndex returns the last index in stable storage,
// either from the last log or from the last snapshot.
func (r *Raft) LastIndex() uint64 {
return r.getLastIndex()
}
// AppliedIndex returns the last index applied to the FSM. This is generally
// lagging behind the last index, especially for indexes that are persisted but
// have not yet been considered committed by the leader. NOTE - this reflects
// the last index that was sent to the application's FSM over the apply channel
// but DOES NOT mean that the application's FSM has yet consumed it and applied
// it to its internal state. Thus, the application's state may lag behind this
// index.
func (r *Raft) AppliedIndex() uint64 {
return r.getLastApplied()
}