James Phillips c01a3871c9 Adds support for snapshots and restores. (#2396)
* Updates Raft library to get new snapshot/restore API.

* Basic backup and restore working, but need some cleanup.

* Breaks out a snapshot module and adds a SHA256 integrity check.

* Adds snapshot ACL and fills in some missing comments.

* Require a consistent read for snapshots.

* Make sure snapshot works if ACLs aren't enabled.

* Adds a bit of package documentation.

* Returns an empty response from restore to avoid EOF errors.

* Adds API client support for snapshots.

* Makes internal file names match on-disk file snapshots.

* Adds DC and token coverage for snapshot API test.

* Adds missing documentation.

* Adds a unit test for the snapshot client endpoint.

* Moves the connection pool out of the client for easier testing.

* Fixes an incidental issue in the prepared query unit test.

I realized I had two servers in bootstrap mode so this wasn't a good setup.

* Adds a half close to the TCP stream and fixes panic on error.

* Adds client and endpoint tests for snapshots.

* Moves the pool back into the snapshot RPC client.

* Adds a TLS test and fixes half-closes for TLS connections.

* Tweaks some comments.

* Adds a low-level snapshot test.

This is independent of Consul so we can pull this out into a library
later if we want to.

* Cleans up snapshot and archive and completes archive tests.

* Sends a clear error for snapshot operations in dev mode.

Snapshots require the Raft snapshots to be readable, which isn't supported
in dev mode. Send a clear error instead of a deep-down Raft one.

* Adds docs for the snapshot endpoint.

* Adds a stale mode and index feedback for snapshot saves.

This gives folks a way to extract data even if the cluster has no
leader.

* Changes the internal format of a snapshot from zip to tgz.

* Pulls in Raft fix to cancel inflight before a restore.

* Pulls in new Raft restore interface.

* Adds metadata to snapshot saves and a verify function.

* Adds basic save and restore snapshot CLI commands.

* Gets rid of tarball extensions and adds restore message.

* Fixes an incidental bad link in the KV docs.

* Adds documentation for the snapshot CLI commands.

* Scuttle any request body when a snapshot is saved.

* Fixes archive unit test error message check.

* Allows for nil output writers in snapshot RPC handlers.

* Renames hash list Decode to DecodeAndVerify.

* Closes the client connection for snapshot ops.

* Lowers timeout for restore ops.

* Updates Raft vendor to get new Restore signature and integrates with Consul.

* Bounces the leader's internal state when we do a restore.
2016-10-25 19:20:24 -07:00

290 lines
6.7 KiB
Go

package raft
import (
"fmt"
"io"
"sync"
"time"
)
// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
Error() error
}
// IndexFuture is used for future actions that can result in a raft log entry
// being created.
type IndexFuture interface {
Future
// Index holds the index of the newly applied log entry.
// This must not be called until after the Error method has returned.
Index() uint64
}
// ApplyFuture is used for Apply and can return the FSM response.
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
Response() interface{}
}
// ConfigurationFuture is used for GetConfiguration and can return the
// latest configuration in use by Raft.
type ConfigurationFuture interface {
IndexFuture
// Configuration contains the latest configuration. This must
// not be called until after the Error method has returned.
Configuration() Configuration
}
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
type SnapshotFuture interface {
Future
// Open is a function you can call to access the underlying snapshot and
// its metadata. This must not be called until after the Error method
// has returned.
Open() (*SnapshotMeta, io.ReadCloser, error)
}
// errorFuture is used to return a static error.
type errorFuture struct {
err error
}
func (e errorFuture) Error() error {
return e.err
}
func (e errorFuture) Response() interface{} {
return nil
}
func (e errorFuture) Index() uint64 {
return 0
}
// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
err error
errCh chan error
responded bool
}
func (d *deferError) init() {
d.errCh = make(chan error, 1)
}
func (d *deferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
// send so we'll still return nil below.
return d.err
}
if d.errCh == nil {
panic("waiting for response on nil channel")
}
d.err = <-d.errCh
return d.err
}
func (d *deferError) respond(err error) {
if d.errCh == nil {
return
}
if d.responded {
return
}
d.errCh <- err
close(d.errCh)
d.responded = true
}
// There are several types of requests that cause a configuration entry to
// be appended to the log. These are encoded here for leaderLoop() to process.
// This is internal to a single server.
type configurationChangeFuture struct {
logFuture
req configurationChangeRequest
}
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
// Raft object's BootstrapCluster member function for more details.
type bootstrapFuture struct {
deferError
// configuration is the proposed bootstrap configuration to apply.
configuration Configuration
}
// logFuture is used to apply a log entry and waits until
// the log is considered committed.
type logFuture struct {
deferError
log Log
response interface{}
dispatch time.Time
}
func (l *logFuture) Response() interface{} {
return l.response
}
func (l *logFuture) Index() uint64 {
return l.log.Index
}
type shutdownFuture struct {
raft *Raft
}
func (s *shutdownFuture) Error() error {
if s.raft == nil {
return nil
}
s.raft.waitShutdown()
if closeable, ok := s.raft.trans.(WithClose); ok {
closeable.Close()
}
return nil
}
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
// complete.
type userSnapshotFuture struct {
deferError
// opener is a function used to open the snapshot. This is filled in
// once the future returns with no error.
opener func() (*SnapshotMeta, io.ReadCloser, error)
}
// Open is a function you can call to access the underlying snapshot and its
// metadata.
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available")
} else {
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
}
// userRestoreFuture is used for waiting on a user-triggered restore of an
// external snapshot to complete.
type userRestoreFuture struct {
deferError
// meta is the metadata that belongs with the snapshot.
meta *SnapshotMeta
// reader is the interface to read the snapshot contents from.
reader io.Reader
}
// reqSnapshotFuture is used for requesting a snapshot start.
// It is only used internally.
type reqSnapshotFuture struct {
deferError
// snapshot details provided by the FSM runner before responding
index uint64
term uint64
snapshot FSMSnapshot
}
// restoreFuture is used for requesting an FSM to perform a
// snapshot restore. Used internally only.
type restoreFuture struct {
deferError
ID string
}
// verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
deferError
notifyCh chan *verifyFuture
quorumSize int
votes int
voteLock sync.Mutex
}
// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
deferError
configurations configurations
}
// Configuration returns the latest configuration in use by Raft.
func (c *configurationsFuture) Configuration() Configuration {
return c.configurations.latest
}
// Index returns the index of the latest configuration in use by Raft.
func (c *configurationsFuture) Index() uint64 {
return c.configurations.latestIndex
}
// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
// appendFuture is used for waiting on a pipelined append
// entries RPC.
type appendFuture struct {
deferError
start time.Time
args *AppendEntriesRequest
resp *AppendEntriesResponse
}
func (a *appendFuture) Start() time.Time {
return a.start
}
func (a *appendFuture) Request() *AppendEntriesRequest {
return a.args
}
func (a *appendFuture) Response() *AppendEntriesResponse {
return a.resp
}