raft: update raft to v1.1.2 (#7079)

* update raft
* use hclogger for raft.
This commit is contained in:
Hans Hasselberg 2020-01-20 13:58:02 +01:00 committed by GitHub
parent c9dbcc31ec
commit 9c1361c02b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 700 additions and 349 deletions

View File

@ -580,13 +580,20 @@ func (s *Server) setupRaft() error {
serverAddressProvider = s.serverLookup
}
raftLogger := hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(s.config.LogLevel),
Output: s.config.LogOutput,
TimeFormat: `2006/01/02 15:04:05`,
})
// Create a transport layer.
transConfig := &raft.NetworkTransportConfig{
Stream: s.raftLayer,
MaxPool: 3,
Timeout: 10 * time.Second,
ServerAddressProvider: serverAddressProvider,
Logger: s.logger,
Logger: raftLogger,
}
trans := raft.NewNetworkTransportWithConfig(transConfig)
@ -594,12 +601,6 @@ func (s *Server) setupRaft() error {
// Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput
raftLogger := hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(s.config.LogLevel),
Output: s.config.LogOutput,
TimeFormat: `2006/01/02 15:04:05`,
})
s.config.RaftConfig.Logger = raftLogger
// Versions of the Raft protocol below 3 require the LocalID to match the network

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/hashicorp/mdns v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.5
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.1.1
github.com/hashicorp/raft v1.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.8.5
github.com/hashicorp/vault/api v1.0.4

6
go.sum
View File

@ -187,6 +187,8 @@ github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c=
github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
@ -349,8 +351,6 @@ github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59b
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/crypto v0.0.0-20191106202628-ed6320f186d4 h1:PDpCLFAH/YIX0QpHPf2eO7L4rC2OOirBrKtXTLLiNTY=
@ -398,8 +398,6 @@ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c h1:hDn6jm7snBX2O7+EeTk6Q4WXJ
golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

49
vendor/github.com/hashicorp/raft/.golangci-lint.yml generated vendored Normal file
View File

@ -0,0 +1,49 @@
run:
deadline: 5m
linters-settings:
govet:
check-shadowing: true
golint:
min-confidence: 0
linters:
disable-all: true
enable:
- gofmt
#- golint
- govet
#- varcheck
#- typecheck
#- gosimple
issues:
exclude-use-default: false
exclude:
# ignore the false positive erros resulting from not including a comment above every `package` keyword
- should have a package comment, unless it's in another file for this package (golint)
# golint: Annoying issue about not having a comment. The rare codebase has such comments
# - (comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form)
# errcheck: Almost all programs ignore errors on these functions and in most cases it's ok
- Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*printf?|os\.(Un)?Setenv). is not checked
# golint: False positive when tests are defined in package 'test'
- func name will be used as test\.Test.* by other packages, and that stutters; consider calling this
# staticcheck: Developers tend to write in C-style with an
# explicit 'break' in a 'switch', so it's ok to ignore
- ineffective break statement. Did you mean to break out of the outer loop
# gosec: Too many false-positives on 'unsafe' usage
- Use of unsafe calls should be audited
# gosec: Too many false-positives for parametrized shell calls
- Subprocess launch(ed with variable|ing should be audited)
# gosec: Duplicated errcheck checks
- G104
# gosec: Too many issues in popular repos
- (Expect directory permissions to be 0750 or less|Expect file permissions to be 0600 or less)
# gosec: False positive is triggered by 'src, err := ioutil.ReadFile(filename)'
- Potential file inclusion via variable

View File

@ -8,7 +8,10 @@ go:
- 1.12
- tip
install: make deps
install:
- make deps
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin latest
script:
- make integ

View File

@ -1,4 +1,21 @@
# UNRELEASED
# 1.1.2 (January 17th, 2020)
FEATURES
* Improve FSM apply performance through batching. Implementing the `BatchingFSM` interface enables this new feature [[GH-364](https://github.com/hashicorp/raft/pull/364)]
* Add ability to obtain Raft configuration before Raft starts with GetConfiguration [[GH-369](https://github.com/hashicorp/raft/pull/369)]
IMPROVEMENTS
* Remove lint violations and add a `make` rule for running the linter.
* Replace logger with hclog [[GH-360](https://github.com/hashicorp/raft/pull/360)]
* Read latest configuration independently from main loop [[GH-379](https://github.com/hashicorp/raft/pull/379)]
BUG FIXES
* Export the leader field in LeaderObservation [[GH-357](https://github.com/hashicorp/raft/pull/357)]
* Fix snapshot to not attempt to truncate a negative range [[GH-358](https://github.com/hashicorp/raft/pull/358)]
* Check for shutdown in inmemPipeline before sending RPCs [[GH-276](https://github.com/hashicorp/raft/pull/276)]
# 1.1.1 (July 23rd, 2019)

View File

@ -1,30 +1,57 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
ENV = $(shell go env GOPATH)
GO_VERSION = $(shell go version)
GOLANG_CI_VERSION = v1.19.0
# Look for versions prior to 1.10 which have a different fmt output
# and don't lint with gofmt against them.
ifneq (,$(findstring go version go1.8, $(GO_VERSION)))
FMT=
else ifneq (,$(findstring go version go1.9, $(GO_VERSION)))
FMT=
else
FMT=--enable gofmt
endif
TEST_RESULTS_DIR?=/tmp/test-results
test:
go test -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
integ: test
INTEG_TESTS=yes go test -timeout=25s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ .
ci.test-norace:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest
ci.test:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest .
ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest .
fuzz:
go test -timeout=300s ./fuzzy
go test $(TESTARGS) -timeout=20m ./fuzzy
go test $(TESTARGS) -timeout=20m -tags batchtest ./fuzzy
deps:
go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
lint:
gofmt -s -w .
golangci-lint run -c .golangci-lint.yml $(FMT) .
dep-linter:
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(ENV)/bin $(GOLANG_CI_VERSION)
cov:
INTEG_TESTS=yes gocov test github.com/hashicorp/raft | gocov-html > /tmp/coverage.html
open /tmp/coverage.html
.PHONY: test cov integ deps
.PHONY: test cov integ deps dep-linter lint

View File

@ -7,11 +7,11 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
)
const (
@ -139,6 +139,10 @@ type Raft struct {
// the log/snapshot.
configurations configurations
// Holds a copy of the latest configuration which can be read
// independently from main loop.
latestConfiguration atomic.Value
// RPC chan comes from the transport layer
rpcCh <-chan RPC
@ -234,7 +238,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
entry.Data = encodePeers(configuration, trans)
} else {
entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration)
entry.Data = EncodeConfiguration(configuration)
}
if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err)
@ -288,37 +292,36 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// expect data to be there and it's not. By refusing, we force them
// to show intent to start a cluster fresh by explicitly doing a
// bootstrap, rather than quietly fire up a fresh cluster here.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
if hasState, err := HasExistingState(logs, stable, snaps); err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if !hasState {
} else if !hasState {
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
}
// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
snapshots, err := snaps.List()
var (
snapshotIndex uint64
snapshotTerm uint64
snapshots, err = snaps.List()
)
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}
for _, snapshot := range snapshots {
if !conf.NoSnapshotRestoreOnStart {
_, source, err := snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
var source io.ReadCloser
_, source, err = snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
if err != nil {
// Same here, skip and try the next one.
continue
}
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
if err != nil {
// Same here, skip and try the next one.
continue
}
snapshotIndex = snapshot.Index
@ -341,7 +344,7 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
}
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
var entry Log
if err := logs.GetLog(index, &entry); err != nil {
if err = logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
@ -362,10 +365,10 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
if err := snapshot.Persist(sink); err != nil {
if err = snapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
}
if err := sink.Close(); err != nil {
if err = sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
@ -382,6 +385,23 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil
}
// GetConfiguration returns the configuration of the Raft cluster without
// starting a Raft instance or connecting to the cluster
// This function has identical behavior to Raft.GetConfiguration
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
if err != nil {
return Configuration{}, err
}
future := r.GetConfiguration()
if err = future.Error(); err != nil {
return Configuration{}, err
}
return future.Configuration(), nil
}
// HasExistingState returns true if the server has any existing state (logs,
// knowledge of a current term, or any snapshots).
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) {
@ -528,19 +548,23 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
if err := r.logs.GetLog(index, &entry); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", index, err))
r.logger.Error("failed to get log", "index", index, "error", err)
panic(err)
}
r.processConfigurationLogEntry(&entry)
}
r.logger.Info(fmt.Sprintf("Initial configuration (index=%d): %+v",
r.configurations.latestIndex, r.configurations.latest.Servers))
r.logger.Info("initial configuration",
"index", r.configurations.latestIndex,
"servers", hclog.Fmt("%+v", r.configurations.latest.Servers))
// Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)
if conf.skipStartup {
return r, nil
}
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
@ -554,7 +578,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
func (r *Raft) restoreSnapshot() error {
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err))
r.logger.Error("failed to list snapshots", "error", err)
return err
}
@ -563,7 +587,7 @@ func (r *Raft) restoreSnapshot() error {
if !r.conf.NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapshot.ID, err))
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue
}
@ -571,11 +595,11 @@ func (r *Raft) restoreSnapshot() error {
// Close the source after the restore has completed
source.Close()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to restore snapshot %v: %v", snapshot.ID, err))
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue
}
r.logger.Info(fmt.Sprintf("Restored from snapshot %v", snapshot.ID))
r.logger.Info("restored from snapshot", "id", snapshot.ID)
}
// Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index)
@ -584,18 +608,17 @@ func (r *Raft) restoreSnapshot() error {
r.setLastSnapshot(snapshot.Index, snapshot.Term)
// Update the configuration
var conf Configuration
var index uint64
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
conf = decodePeers(snapshot.Peers, r.trans)
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)
// Success!
return nil
@ -727,19 +750,14 @@ func (r *Raft) VerifyLeader() Future {
}
}
// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
return configReq
}
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
@ -1013,7 +1031,7 @@ func (r *Raft) Stats() map[string]string {
future := r.GetConfiguration()
if err := future.Error(); err != nil {
r.logger.Warn(fmt.Sprintf("could not get configuration for Stats: %v", err))
r.logger.Warn("could not get configuration for stats", "error", err)
} else {
configuration := future.Configuration()
s["latest_configuration_index"] = toString(future.Index())

View File

@ -35,7 +35,7 @@ type AppendEntriesRequest struct {
LeaderCommitIndex uint64
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -59,7 +59,7 @@ type AppendEntriesResponse struct {
NoRetryBackoff bool
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -83,7 +83,7 @@ type RequestVoteRequest struct {
LeadershipTransfer bool
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *RequestVoteRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -104,7 +104,7 @@ type RequestVoteResponse struct {
Granted bool
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -136,7 +136,7 @@ type InstallSnapshotRequest struct {
Size int64
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -150,7 +150,7 @@ type InstallSnapshotResponse struct {
Success bool
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -161,7 +161,7 @@ type TimeoutNowRequest struct {
RPCHeader
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
@ -171,7 +171,7 @@ type TimeoutNowResponse struct {
RPCHeader
}
// See WithRPCHeader.
// GetRPCHeader - See WithRPCHeader.
func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

View File

@ -8,8 +8,8 @@ import (
"github.com/hashicorp/go-hclog"
)
// These are the versions of the protocol (which includes RPC messages as
// well as Raft-specific log entries) that this server can _understand_. Use
// ProtocolVersion is the version of the protocol (which includes RPC messages
// as well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers. Note that depending on
// the protocol version being spoken, some otherwise understood RPC messages
@ -88,13 +88,15 @@ import (
type ProtocolVersion int
const (
// ProtocolVersionMin is the minimum protocol version
ProtocolVersionMin ProtocolVersion = 0
ProtocolVersionMax = 3
// ProtocolVersionMax is the maximum protocol version
ProtocolVersionMax = 3
)
// These are versions of snapshots that this server can _understand_. Currently,
// it is always assumed that this server generates the latest version, though
// this may be changed in the future to include a configurable version.
// SnapshotVersion is the version of snapshots that this server can understand.
// Currently, it is always assumed that the server generates the latest version,
// though this may be changed in the future to include a configurable version.
//
// Version History
//
@ -112,8 +114,10 @@ const (
type SnapshotVersion int
const (
// SnapshotVersionMin is the minimum snapshot version
SnapshotVersionMin SnapshotVersion = 0
SnapshotVersionMax = 1
// SnapshotVersionMax is the maximum snapshot version
SnapshotVersionMax = 1
)
// Config provides any necessary configuration for the Raft server.
@ -202,10 +206,13 @@ type Config struct {
// NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// FSM on start. This is useful if your FSM recovers from other mechanisms
// than raft snapshotting. Snapshot metadata will still be used to initalize
// than raft snapshotting. Snapshot metadata will still be used to initialize
// raft's configuration and index values. This is used in NewRaft and
// RestoreCluster.
NoSnapshotRestoreOnStart bool
// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
// DefaultConfig returns a Config with usable defaults.

View File

@ -342,9 +342,9 @@ func decodePeers(buf []byte, trans Transport) Configuration {
}
}
// encodeConfiguration serializes a Configuration using MsgPack, or panics on
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors.
func encodeConfiguration(configuration Configuration) []byte {
func EncodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration)
if err != nil {
panic(fmt.Errorf("failed to encode configuration: %v", err))
@ -352,9 +352,9 @@ func encodeConfiguration(configuration Configuration) []byte {
return buf.Bytes()
}
// decodeConfiguration deserializes a Configuration using MsgPack, or panics on
// DecodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors.
func decodeConfiguration(buf []byte) Configuration {
func DecodeConfiguration(buf []byte) Configuration {
var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err))

View File

@ -12,6 +12,11 @@ import (
// suitable for testing.
type DiscardSnapshotStore struct{}
// DiscardSnapshotSink is used to fulfill the SnapshotSink interface
// while always discarding the . This is useful for when the log
// should be truncated but no snapshot should be retained. This
// should never be used for production use, and is only suitable
// for testing.
type DiscardSnapshotSink struct{}
// NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore.
@ -19,31 +24,41 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore {
return &DiscardSnapshotStore{}
}
// Create returns a valid type implementing the SnapshotSink which
// always discards the snapshot.
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
return &DiscardSnapshotSink{}, nil
}
// List returns successfully with a nil for []*SnapshotMeta.
func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) {
return nil, nil
}
// Open returns an error since the DiscardSnapshotStore does not
// support opening snapshots.
func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
return nil, nil, fmt.Errorf("open is not supported")
}
// Write returns successfully with the length of the input byte slice
// to satisfy the WriteCloser interface
func (d *DiscardSnapshotSink) Write(b []byte) (int, error) {
return len(b), nil
}
// Close returns a nil error
func (d *DiscardSnapshotSink) Close() error {
return nil
}
// ID returns "discard" for DiscardSnapshotSink
func (d *DiscardSnapshotSink) ID() string {
return "discard"
}
// Cancel returns successfully with a nil error
func (d *DiscardSnapshotSink) Cancel() error {
return nil
}

View File

@ -5,11 +5,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/hashicorp/go-hclog"
"hash"
"hash/crc64"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"runtime"
@ -31,7 +31,7 @@ const (
type FileSnapshotStore struct {
path string
retain int
logger *log.Logger
logger hclog.Logger
}
type snapMetaSlice []*fileSnapshotMeta
@ -39,7 +39,7 @@ type snapMetaSlice []*fileSnapshotMeta
// FileSnapshotSink implements SnapshotSink with a file.
type FileSnapshotSink struct {
store *FileSnapshotStore
logger *log.Logger
logger hclog.Logger
dir string
parentDir string
meta fileSnapshotMeta
@ -76,12 +76,16 @@ func (b *bufferedFile) Close() error {
// NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based
// on a base directory. The `retain` parameter controls how many
// snapshots are retained. Must be at least 1.
func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error) {
func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) {
if retain < 1 {
return nil, fmt.Errorf("must retain at least one snapshot")
}
if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags)
logger = hclog.New(&hclog.LoggerOptions{
Name: "snapshot",
Output: hclog.DefaultOutput,
Level: hclog.DefaultLevel,
})
}
// Ensure our path exists
@ -111,7 +115,11 @@ func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSn
if logOutput == nil {
logOutput = os.Stderr
}
return NewFileSnapshotStoreWithLogger(base, retain, log.New(logOutput, "", log.LstdFlags))
return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{
Name: "snapshot",
Output: logOutput,
Level: hclog.DefaultLevel,
}))
}
// testPermissions tries to touch a file in our path to see if it works.
@ -150,11 +158,11 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
// Create a new path
name := snapshotName(term, index)
path := filepath.Join(f.path, name+tmpSuffix)
f.logger.Printf("[INFO] snapshot: Creating new snapshot at %s", path)
f.logger.Info("creating new snapshot", "path", path)
// Make the directory
if err := os.MkdirAll(path, 0755); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to make snapshot directory: %v", err)
f.logger.Error("failed to make snapshot directly", "error", err)
return nil, err
}
@ -180,7 +188,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
// Write out the meta data
if err := sink.writeMeta(); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err)
f.logger.Error("failed to write metadata", "error", err)
return nil, err
}
@ -188,7 +196,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
statePath := filepath.Join(path, stateFilePath)
fh, err := os.Create(statePath)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to create state file: %v", err)
f.logger.Error("failed to create state file", "error", err)
return nil, err
}
sink.stateFile = fh
@ -209,7 +217,7 @@ func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) {
// Get the eligible snapshots
snapshots, err := f.getSnapshots()
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err)
f.logger.Error("failed to get snapshots", "error", err)
return nil, err
}
@ -228,7 +236,7 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Get the eligible snapshots
snapshots, err := ioutil.ReadDir(f.path)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to scan snapshot dir: %v", err)
f.logger.Error("failed to scan snapshot directory", "error", err)
return nil, err
}
@ -243,20 +251,20 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Ignore any temporary snapshots
dirName := snap.Name()
if strings.HasSuffix(dirName, tmpSuffix) {
f.logger.Printf("[WARN] snapshot: Found temporary snapshot: %v", dirName)
f.logger.Warn("found temporary snapshot", "name", dirName)
continue
}
// Try to read the meta data
meta, err := f.readMeta(dirName)
if err != nil {
f.logger.Printf("[WARN] snapshot: Failed to read metadata for %v: %v", dirName, err)
f.logger.Warn("failed to read metadata", "name", dirName, "error", err)
continue
}
// Make sure we can understand this version.
if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax {
f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version)
f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version)
continue
}
@ -297,7 +305,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Get the metadata
meta, err := f.readMeta(id)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get meta data to open snapshot: %v", err)
f.logger.Error("failed to get meta data to open snapshot", "error", err)
return nil, nil, err
}
@ -305,7 +313,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
statePath := filepath.Join(f.path, id, stateFilePath)
fh, err := os.Open(statePath)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to open state file: %v", err)
f.logger.Error("failed to open state file", "error", err)
return nil, nil, err
}
@ -315,7 +323,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Compute the hash
_, err = io.Copy(stateHash, fh)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to read state file: %v", err)
f.logger.Error("failed to read state file", "error", err)
fh.Close()
return nil, nil, err
}
@ -323,15 +331,14 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Verify the hash
computed := stateHash.Sum(nil)
if bytes.Compare(meta.CRC, computed) != 0 {
f.logger.Printf("[ERR] snapshot: CRC checksum failed (stored: %v computed: %v)",
meta.CRC, computed)
f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed)
fh.Close()
return nil, nil, fmt.Errorf("CRC mismatch")
}
// Seek to the start
if _, err := fh.Seek(0, 0); err != nil {
f.logger.Printf("[ERR] snapshot: State file seek failed: %v", err)
f.logger.Error("state file seek failed", "error", err)
fh.Close()
return nil, nil, err
}
@ -349,15 +356,15 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
func (f *FileSnapshotStore) ReapSnapshots() error {
snapshots, err := f.getSnapshots()
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err)
f.logger.Error("failed to get snapshots", "error", err)
return err
}
for i := f.retain; i < len(snapshots); i++ {
path := filepath.Join(f.path, snapshots[i].ID)
f.logger.Printf("[INFO] snapshot: reaping snapshot %v", path)
f.logger.Info("reaping snapshot", "path", path)
if err := os.RemoveAll(path); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to reap snapshot %v: %v", path, err)
f.logger.Error("failed to reap snapshot", "path", path, "error", err)
return err
}
}
@ -386,9 +393,9 @@ func (s *FileSnapshotSink) Close() error {
// Close the open handles
if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err)
s.logger.Error("failed to finalize snapshot", "error", err)
if delErr := os.RemoveAll(s.dir); delErr != nil {
s.logger.Printf("[ERR] snapshot: Failed to delete temporary snapshot directory at path %v: %v", s.dir, delErr)
s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr)
return delErr
}
return err
@ -396,27 +403,27 @@ func (s *FileSnapshotSink) Close() error {
// Write out the meta data
if err := s.writeMeta(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err)
s.logger.Error("failed to write metadata", "error", err)
return err
}
// Move the directory into place
newPath := strings.TrimSuffix(s.dir, tmpSuffix)
if err := os.Rename(s.dir, newPath); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to move snapshot into place: %v", err)
s.logger.Error("failed to move snapshot into place", "error", err)
return err
}
if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
parentFH, err := os.Open(s.parentDir)
defer parentFH.Close()
if err != nil {
s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err)
s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err)
return err
}
if err = parentFH.Sync(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err)
s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err)
return err
}
}
@ -439,7 +446,7 @@ func (s *FileSnapshotSink) Cancel() error {
// Close the open handles
if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err)
s.logger.Error("failed to finalize snapshot", "error", err)
return err
}
@ -480,9 +487,11 @@ func (s *FileSnapshotSink) finalize() error {
// writeMeta is used to write out the metadata we have.
func (s *FileSnapshotSink) writeMeta() error {
var err error
// Open the meta file
metaPath := filepath.Join(s.dir, metaFilePath)
fh, err := os.Create(metaPath)
var fh *os.File
fh, err = os.Create(metaPath)
if err != nil {
return err
}
@ -493,7 +502,7 @@ func (s *FileSnapshotSink) writeMeta() error {
// Write out as JSON
enc := json.NewEncoder(buffered)
if err := enc.Encode(&s.meta); err != nil {
if err = enc.Encode(&s.meta); err != nil {
return err
}

View File

@ -31,6 +31,26 @@ type FSM interface {
Restore(io.ReadCloser) error
}
// BatchingFSM extends the FSM interface to add an ApplyBatch function. This can
// optionally be implemented by clients to enable multiple logs to be applied to
// the FSM in batches. Up to MaxAppendEntries could be sent in a batch.
type BatchingFSM interface {
// ApplyBatch is invoked once a batch of log entries has been committed and
// are ready to be applied to the FSM. ApplyBatch will take in an array of
// log entries. These log entries will be in the order they were committed,
// will not have gaps, and could be of a few log types. Clients should check
// the log type prior to attempting to decode the data attached. Presently
// the LogCommand and LogConfiguration types will be sent.
//
// The returned slice must be the same length as the input and each response
// should correlate to the log at the same index of the input. The returned
// values will be made available in the ApplyFuture returned by Raft.Apply
// method if that method was called on the same Raft node as the FSM.
ApplyBatch([]*Log) []interface{}
FSM
}
// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
@ -49,7 +69,10 @@ type FSMSnapshot interface {
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
commit := func(req *commitTuple) {
batchingFSM, batchingEnabled := r.fsm.(BatchingFSM)
configStore, configStoreEnabled := r.fsm.(ConfigurationStore)
commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change
var resp interface{}
// Make sure we send a response
@ -68,15 +91,14 @@ func (r *Raft) runFSM() {
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
case LogConfiguration:
configStore, ok := r.fsm.(ConfigurationStore)
if !ok {
if !configStoreEnabled {
// Return early to avoid incrementing the index and term for
// an unimplemented operation.
return
}
start := time.Now()
configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data))
configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data))
metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start)
}
@ -85,6 +107,67 @@ func (r *Raft) runFSM() {
lastTerm = req.log.Term
}
commitBatch := func(reqs []*commitTuple) {
if !batchingEnabled {
for _, ct := range reqs {
commitSingle(ct)
}
return
}
// Only send LogCommand and LogConfiguration log types. LogBarrier types
// will not be sent to the FSM.
shouldSend := func(l *Log) bool {
switch l.Type {
case LogCommand, LogConfiguration:
return true
}
return false
}
var lastBatchIndex, lastBatchTerm uint64
sendLogs := make([]*Log, 0, len(reqs))
for _, req := range reqs {
if shouldSend(req.log) {
sendLogs = append(sendLogs, req.log)
}
lastBatchIndex = req.log.Index
lastBatchTerm = req.log.Term
}
var responses []interface{}
if len(sendLogs) > 0 {
start := time.Now()
responses = batchingFSM.ApplyBatch(sendLogs)
metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start)
metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs)))
// Ensure we get the expected responses
if len(sendLogs) != len(responses) {
panic("invalid number of responses")
}
}
// Update the indexes
lastIndex = lastBatchIndex
lastTerm = lastBatchTerm
var i int
for _, req := range reqs {
var resp interface{}
// If the log was sent to the FSM, retrieve the response.
if shouldSend(req.log) {
resp = responses[i]
i++
}
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
}
restore := func(req *restoreFuture) {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
@ -132,8 +215,8 @@ func (r *Raft) runFSM() {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)
case []*commitTuple:
commitBatch(req)
case *restoreFuture:
restore(req)

View File

@ -183,14 +183,13 @@ type userSnapshotFuture struct {
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available")
} else {
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
// userRestoreFuture is used for waiting on a user-triggered restore of an

View File

@ -100,10 +100,12 @@ func (s *InmemSnapshotSink) Close() error {
return nil
}
// ID returns the ID of the SnapshotMeta
func (s *InmemSnapshotSink) ID() string {
return s.meta.ID
}
// Cancel returns successfully with a nil error
func (s *InmemSnapshotSink) Cancel() error {
return nil
}

View File

@ -24,7 +24,7 @@ type inmemPipeline struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
}
type inmemPipelineInflight struct {
@ -314,6 +314,17 @@ func (i *inmemPipeline) AppendEntries(args *AppendEntriesRequest, resp *AppendEn
Command: args,
RespChan: respCh,
}
// Check if we have been already shutdown, otherwise the random choose
// made by select statement below might pick consumerCh even if
// shutdownCh was closed.
i.shutdownLock.RLock()
shutdown := i.shutdown
i.shutdownLock.RUnlock()
if shutdown {
return nil, ErrPipelineShutdown
}
select {
case i.peer.consumerCh <- rpc:
case <-timeout:

View File

@ -10,12 +10,12 @@ const (
// LogNoop is used to assert leadership.
LogNoop
// LogAddPeer is used to add a new peer. This should only be used with
// LogAddPeerDeprecated is used to add a new peer. This should only be used with
// older protocol versions designed to be compatible with unversioned
// Raft servers. See comments in config.go for details.
LogAddPeerDeprecated
// LogRemovePeer is used to remove an existing peer. This should only be
// LogRemovePeerDeprecated is used to remove an existing peer. This should only be
// used with older protocol versions designed to be compatible with
// unversioned Raft servers. See comments in config.go for details.
LogRemovePeerDeprecated

View File

@ -5,8 +5,8 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"io"
"log"
"net"
"os"
"sync"
@ -66,7 +66,7 @@ type NetworkTransport struct {
heartbeatFn func(RPC)
heartbeatFnLock sync.Mutex
logger *log.Logger
logger hclog.Logger
maxPool int
@ -92,7 +92,7 @@ type NetworkTransportConfig struct {
// ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC
ServerAddressProvider ServerAddressProvider
Logger *log.Logger
Logger hclog.Logger
// Dialer
Stream StreamLayer
@ -105,6 +105,7 @@ type NetworkTransportConfig struct {
Timeout time.Duration
}
// ServerAddressProvider is a target address to which we invoke an RPC when establishing a connection
type ServerAddressProvider interface {
ServerAddr(id ServerID) (ServerAddress, error)
}
@ -148,7 +149,11 @@ func NewNetworkTransportWithConfig(
config *NetworkTransportConfig,
) *NetworkTransport {
if config.Logger == nil {
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
config.Logger = hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: hclog.DefaultOutput,
Level: hclog.DefaultLevel,
})
}
trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn),
@ -182,7 +187,11 @@ func NewNetworkTransport(
if logOutput == nil {
logOutput = os.Stderr
}
logger := log.New(logOutput, "", log.LstdFlags)
logger := hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: logOutput,
Level: hclog.DefaultLevel,
})
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config)
}
@ -195,7 +204,7 @@ func NewNetworkTransportWithLogger(
stream StreamLayer,
maxPool int,
timeout time.Duration,
logger *log.Logger,
logger hclog.Logger,
) *NetworkTransport {
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config)
@ -310,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil {
n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err)
n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err)
} else {
return serverAddressOverride
}
@ -486,7 +495,7 @@ func (n *NetworkTransport) listen() {
}
if !n.IsShutdown() {
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
n.logger.Error("failed to accept connection", "error", err)
}
select {
@ -499,7 +508,7 @@ func (n *NetworkTransport) listen() {
// No error, reset loop delay
loopDelay = 0
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
n.logger.Debug("accepted connection", "local-address", n.LocalAddr(), "remote-address", conn.RemoteAddr().String())
// Handle the connection in dedicated routine
go n.handleConn(n.getStreamContext(), conn)
@ -519,19 +528,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
for {
select {
case <-connCtx.Done():
n.logger.Println("[DEBUG] raft-net: stream layer is closed")
n.logger.Debug("stream layer is closed")
return
default:
}
if err := n.handleCommand(r, dec, enc); err != nil {
if err != io.EOF {
n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err)
n.logger.Error("failed to decode incoming command", "error", err)
}
return
}
if err := w.Flush(); err != nil {
n.logger.Printf("[ERR] raft-net: Failed to flush response: %v", err)
n.logger.Error("failed to flush response", "error", err)
return
}
}

View File

@ -18,7 +18,7 @@ type Observation struct {
// LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct {
leader ServerAddress
Leader ServerAddress
}
// PeerObservation is sent to observers when peers change.

View File

@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics"
)
@ -94,7 +96,7 @@ func (r *Raft) setLeader(leader ServerAddress) {
r.leader = leader
r.leaderLock.Unlock()
if oldLeader != leader {
r.observe(LeaderObservation{leader: leader})
r.observe(LeaderObservation{Leader: leader})
}
}
@ -147,7 +149,7 @@ func (r *Raft) run() {
// runFollower runs the FSM for a follower.
func (r *Raft) runFollower() {
didWarn := false
r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader()))
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
@ -209,7 +211,7 @@ func (r *Raft) runFollower() {
didWarn = true
}
} else {
r.logger.Warn(fmt.Sprintf("Heartbeat timeout from %q reached, starting election", lastLeader))
r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader)
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
r.setState(Candidate)
return
@ -245,7 +247,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
// runCandidate runs the FSM for a candidate.
func (r *Raft) runCandidate() {
r.logger.Info(fmt.Sprintf("%v entering Candidate state in term %v", r, r.getCurrentTerm()+1))
r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1)
metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1)
// Start vote for us, and set a timeout
@ -263,7 +265,7 @@ func (r *Raft) runCandidate() {
// Tally the votes, need a simple majority
grantedVotes := 0
votesNeeded := r.quorumSize()
r.logger.Debug(fmt.Sprintf("Votes needed: %d", votesNeeded))
r.logger.Debug("votes", "needed", votesNeeded)
for r.getState() == Candidate {
select {
@ -273,7 +275,7 @@ func (r *Raft) runCandidate() {
case vote := <-voteCh:
// Check if the term is greater than ours, bail
if vote.Term > r.getCurrentTerm() {
r.logger.Debug("Newer term discovered, fallback to follower")
r.logger.Debug("newer term discovered, fallback to follower")
r.setState(Follower)
r.setCurrentTerm(vote.Term)
return
@ -282,13 +284,12 @@ func (r *Raft) runCandidate() {
// Check if the vote is granted
if vote.Granted {
grantedVotes++
r.logger.Debug(fmt.Sprintf("Vote granted from %s in term %v. Tally: %d",
vote.voterID, vote.Term, grantedVotes))
r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
}
// Check if we've become the leader
if grantedVotes >= votesNeeded {
r.logger.Info(fmt.Sprintf("Election won. Tally: %d", grantedVotes))
r.logger.Info("election won", "tally", grantedVotes)
r.setState(Leader)
r.setLeader(r.localAddr)
return
@ -359,7 +360,7 @@ func (r *Raft) setupLeaderState() {
// runLeader runs the FSM for a leader. Do the setup here and drop into
// the leaderLoop for the hot loop.
func (r *Raft) runLeader() {
r.logger.Info(fmt.Sprintf("%v entering Leader state", r))
r.logger.Info("entering leader state", "leader", r)
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
// Notify that we are the leader
@ -470,7 +471,7 @@ func (r *Raft) startStopReplication() {
}
inConfig[server.ID] = true
if _, ok := r.leaderState.replState[server.ID]; !ok {
r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID))
r.logger.Info("added peer, starting replication", "peer", server.ID)
s := &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
@ -497,7 +498,7 @@ func (r *Raft) startStopReplication() {
continue
}
// Replicate up to lastIdx and stop
r.logger.Info(fmt.Sprintf("Removed peer %v, stopping replication after %v", serverID, lastIdx))
r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx)
repl.stopCh <- lastIdx
close(repl.stopCh)
delete(r.leaderState.replState, serverID)
@ -618,49 +619,58 @@ func (r *Raft) leaderLoop() {
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
// New configration has been committed, set it as the committed
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
}
var numProcessed int
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
for {
e := r.leaderState.inflight.Front()
if e == nil {
break
}
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
r.processLogs(idx, commitLog)
// Process the group
if len(groupReady) != 0 {
r.processLogs(lastIdxInGroup, groupFutures)
r.leaderState.inflight.Remove(e)
numProcessed++
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed))
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown {
if r.conf.ShutdownOnRemove {
r.logger.Info("Removed ourself, shutting down")
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
r.logger.Info("Removed ourself, transitioning to follower")
r.logger.Info("removed ourself, transitioning to follower")
r.setState(Follower)
}
}
@ -672,7 +682,7 @@ func (r *Raft) leaderLoop() {
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("New leader elected, stepping down")
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
@ -867,9 +877,9 @@ func (r *Raft) checkLeaderLease() time.Duration {
} else {
// Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout {
r.logger.Warn(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff))
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else {
r.logger.Debug(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff))
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
}
}
metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond))
@ -879,7 +889,7 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Verify we can contact a quorum
quorum := r.quorumSize()
if contacted < quorum {
r.logger.Warn("Failed to contact quorum of nodes, stepping down")
r.logger.Warn("failed to contact quorum of nodes, stepping down")
r.setState(Follower)
metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1)
}
@ -967,7 +977,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err)
}
r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n))
r.logger.Info("copied to local snapshot", "bytes", n)
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
@ -991,7 +1001,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)
r.logger.Info(fmt.Sprintf("Restored user snapshot (index %d)", lastIndex))
r.logger.Info("restored user snapshot", "index", latestIndex)
return nil
}
@ -1005,8 +1015,11 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
return
}
r.logger.Info(fmt.Sprintf("Updating configuration with %s (%v, %v) to %+v",
future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers))
r.logger.Info("updating configuration",
"command", future.req.command,
"server-id", future.req.serverID,
"server-addr", future.req.serverAddress,
"servers", hclog.Fmt("%+v", configuration.Servers))
// In pre-ID compatibility mode we translate all configuration changes
// in to an old remove peer message, which can handle all supported
@ -1023,14 +1036,13 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
} else {
future.log = Log{
Type: LogConfiguration,
Data: encodeConfiguration(configuration),
Data: EncodeConfiguration(configuration),
}
}
r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
@ -1059,7 +1071,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
// Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error(fmt.Sprintf("Failed to commit logs: %v", err))
r.logger.Error("failed to commit logs", "error", err)
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
@ -1081,72 +1093,88 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
// applied up to the given index limit.
// This can be called from both leaders and followers.
// Followers call this from AppendEntries, for n entries at a time, and always
// pass future=nil.
// Leaders call this once per inflight when entries are committed. They pass
// the future from inflights.
func (r *Raft) processLogs(index uint64, future *logFuture) {
// pass futures=nil.
// Leaders call this when entries are committed. They pass the futures from any
// inflight logs.
func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn(fmt.Sprintf("Skipping application of old log: %d", index))
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Apply all the preceding logs
for idx := r.getLastApplied() + 1; idx <= index; idx++ {
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx {
r.processLog(&future.log, future)
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", idx, err))
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
r.processLog(l, nil)
preparedLog = r.prepareLog(l, nil)
}
// Update the lastApplied index and term
r.setLastApplied(idx)
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}
// processLog is invoked to process the application of a single committed log entry.
func (r *Raft) processLog(l *Log, future *logFuture) {
func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple {
switch l.Type {
case LogBarrier:
// Barrier is handled by the FSM
fallthrough
case LogCommand:
// Forward to the fsm handler
select {
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
}
}
// Return so that the future is only responded to
// by the FSM handler when the application is done
return
return &commitTuple{l, future}
case LogConfiguration:
// Only support this with the v2 configuration format
if r.protocolVersion > 2 {
// Forward to the fsm handler
select {
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
}
}
// Return so that the future is only responded to
// by the FSM handler when the application is done
return
return &commitTuple{l, future}
}
case LogAddPeerDeprecated:
case LogRemovePeerDeprecated:
@ -1157,10 +1185,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
panic(fmt.Errorf("unrecognized log type: %#v", l))
}
// Invoke the future if given
if future != nil {
future.respond(nil)
}
return nil
}
// processRPC is called to handle an incoming RPC request. This must only be
@ -1181,7 +1206,8 @@ func (r *Raft) processRPC(rpc RPC) {
case *TimeoutNowRequest:
r.timeoutNow(rpc, cmd)
default:
r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command))
r.logger.Error("got unexpected command",
"command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
@ -1204,7 +1230,7 @@ func (r *Raft) processHeartbeat(rpc RPC) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
default:
r.logger.Error(fmt.Sprintf("Expected heartbeat, got command: %#v", rpc.Command))
r.logger.Error("expected heartbeat, got", "command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
@ -1254,8 +1280,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} else {
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn(fmt.Sprintf("Failed to get previous log: %d %v (last: %d)",
a.PrevLogEntry, err, lastIdx))
r.logger.Warn("failed to get previous log",
"previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true
return
}
@ -1263,8 +1291,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}
if a.PrevLogTerm != prevLogTerm {
r.logger.Warn(fmt.Sprintf("Previous log term mis-match: ours: %d remote: %d",
prevLogTerm, a.PrevLogTerm))
r.logger.Warn("previous log term mis-match",
"ours", prevLogTerm,
"remote", a.PrevLogTerm)
resp.NoRetryBackoff = true
return
}
@ -1284,19 +1313,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}
var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn(fmt.Sprintf("Failed to get log entry %d: %v",
entry.Index, err))
r.logger.Warn("failed to get log entry",
"index", entry.Index,
"error", err)
return
}
if entry.Term != storeEntry.Term {
r.logger.Warn(fmt.Sprintf("Clearing log suffix from %d to %d", entry.Index, lastLogIdx))
r.logger.Warn("clearing log suffix",
"from", entry.Index,
"to", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error(fmt.Sprintf("Failed to clear log suffix: %v", err))
r.logger.Error("failed to clear log suffix", "error", err)
return
}
if entry.Index <= r.configurations.latestIndex {
r.configurations.latest = r.configurations.committed
r.configurations.latestIndex = r.configurations.committedIndex
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
@ -1306,7 +1337,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
if n := len(newEntries); n > 0 {
// Append the new entries
if err := r.logs.StoreLogs(newEntries); err != nil {
r.logger.Error(fmt.Sprintf("Failed to append to logs: %v", err))
r.logger.Error("failed to append to logs", "error", err)
// TODO: leaving r.getLastLog() in the wrong
// state if there was a truncation above
return
@ -1331,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
@ -1349,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}
@ -1389,8 +1415,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// vote!
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v",
candidate, leader))
r.logger.Warn("rejecting vote request since we have a leader",
"from", candidate,
"leader", leader)
return
}
@ -1402,7 +1429,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Increase the term if we see a newer one
if req.Term > r.getCurrentTerm() {
// Ensure transition to follower
r.logger.Debug("lost leadership because received a requestvote with newer term")
r.logger.Debug("lost leadership because received a requestVote with a newer term")
r.setState(Follower)
r.setCurrentTerm(req.Term)
resp.Term = req.Term
@ -1411,20 +1438,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Check if we have voted yet
lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)
if err != nil && err.Error() != "not found" {
r.logger.Error(fmt.Sprintf("Failed to get last vote term: %v", err))
r.logger.Error("failed to get last vote term", "error", err)
return
}
lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)
if err != nil && err.Error() != "not found" {
r.logger.Error(fmt.Sprintf("Failed to get last vote candidate: %v", err))
r.logger.Error("failed to get last vote candidate", "error", err)
return
}
// Check if we've voted in this election before
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info(fmt.Sprintf("Duplicate RequestVote for same term: %d", req.Term))
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn(fmt.Sprintf("Duplicate RequestVote from candidate: %s", req.Candidate))
r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate)
resp.Granted = true
}
return
@ -1433,20 +1460,24 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Reject if their term is older
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last term is greater (%d, %d)",
candidate, lastTerm, req.LastLogTerm))
r.logger.Warn("rejecting vote request since our last term is greater",
"candidate", candidate,
"last-term", lastTerm,
"last-candidate-term", req.LastLogTerm)
return
}
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last index is greater (%d, %d)",
candidate, lastIdx, req.LastLogIndex))
r.logger.Warn("rejecting vote request since our last index is greater",
"candidate", candidate,
"last-index", lastIdx,
"last-candidate-index", req.LastLogIndex)
return
}
// Persist a vote for safety
if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error(fmt.Sprintf("Failed to persist vote: %v", err))
r.logger.Error("failed to persist vote", "error", err)
return
}
@ -1481,8 +1512,9 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Ignore an older term
if req.Term < r.getCurrentTerm() {
r.logger.Info(fmt.Sprintf("Ignoring installSnapshot request with older term of %d vs currentTerm %d",
req.Term, r.getCurrentTerm()))
r.logger.Info("ignoring installSnapshot request with older term than current term",
"request-term", req.Term,
"current-term", r.getCurrentTerm())
return
}
@ -1501,7 +1533,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
var reqConfiguration Configuration
var reqConfigurationIndex uint64
if req.SnapshotVersion > 0 {
reqConfiguration = decodeConfiguration(req.Configuration)
reqConfiguration = DecodeConfiguration(req.Configuration)
reqConfigurationIndex = req.ConfigurationIndex
} else {
reqConfiguration = decodePeers(req.Peers, r.trans)
@ -1511,7 +1543,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm,
reqConfiguration, reqConfigurationIndex, r.trans)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to create snapshot to install: %v", err))
r.logger.Error("failed to create snapshot to install", "error", err)
rpcErr = fmt.Errorf("failed to create snapshot: %v", err)
return
}
@ -1520,7 +1552,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
n, err := io.Copy(sink, rpc.Reader)
if err != nil {
sink.Cancel()
r.logger.Error(fmt.Sprintf("Failed to copy snapshot: %v", err))
r.logger.Error("failed to copy snapshot", "error", err)
rpcErr = err
return
}
@ -1528,18 +1560,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Check that we received it all
if n != req.Size {
sink.Cancel()
r.logger.Error(fmt.Sprintf("Failed to receive whole snapshot: %d / %d", n, req.Size))
r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
rpcErr = fmt.Errorf("short read")
return
}
// Finalize the snapshot
if err := sink.Close(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to finalize snapshot: %v", err))
r.logger.Error("failed to finalize snapshot", "error", err)
rpcErr = err
return
}
r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n))
r.logger.Info("copied to local snapshot", "bytes", n)
// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
@ -1553,7 +1586,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Wait for the restore to happen
if err := future.Error(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to restore snapshot: %v", err))
r.logger.Error("failed to restore snapshot", "error", err)
rpcErr = err
return
}
@ -1565,14 +1598,12 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)
// Restore the peer set
r.configurations.latest = reqConfiguration
r.configurations.latestIndex = reqConfigurationIndex
r.configurations.committed = reqConfiguration
r.configurations.committedIndex = reqConfigurationIndex
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)
// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
r.logger.Error(fmt.Sprintf("Failed to compact logs: %v", err))
r.logger.Error("failed to compact logs", "error", err)
}
r.logger.Info("Installed remote snapshot")
@ -1622,7 +1653,9 @@ func (r *Raft) electSelf() <-chan *voteResult {
resp := &voteResult{voterID: peer.ID}
err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to make RequestVote RPC to %v: %v", peer, err))
r.logger.Error("failed to make requestVote RPC",
"target", peer,
"error", err)
resp.Term = req.Term
resp.Granted = false
}
@ -1636,7 +1669,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
if server.ID == r.localID {
// Persist a vote for ourselves
if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error(fmt.Sprintf("Failed to persist vote : %v", err))
r.logger.Error("failed to persist vote", "error", err)
return nil
}
// Include our own vote
@ -1753,3 +1786,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}
// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfiguration.Store(c.Clone())
}
// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}
// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
// this switch catches the case where this is called without having set
// a configuration previously.
switch c := r.latestConfiguration.Load().(type) {
case Configuration:
return c
default:
return Configuration{}
}
}

View File

@ -100,7 +100,7 @@ func (s *followerReplication) notifyAll(leader bool) {
s.notifyLock.Unlock()
// Submit our votes
for v, _ := range n {
for v := range n {
v.vote(leader)
}
}
@ -182,7 +182,7 @@ PIPELINE:
// to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported {
r.logger.Error(fmt.Sprintf("Failed to start pipeline replication to %s: %s", s.peer, err))
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
}
}
goto RPC
@ -215,7 +215,7 @@ START:
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error(fmt.Sprintf("Failed to AppendEntries to %v: %v", s.peer, err))
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
@ -245,7 +245,7 @@ START:
} else {
s.failures++
}
r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, atomic.LoadUint64(&s.nextIndex)))
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
CHECK_MORE:
@ -272,7 +272,7 @@ SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error(fmt.Sprintf("Failed to send snapshot to %v: %v", s.peer, err))
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return
}
@ -286,7 +286,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Get the snapshots
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err))
r.logger.Error("failed to list snapshots", "error", err)
return false, err
}
@ -299,7 +299,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
snapID := snapshots[0].ID
meta, snapshot, err := r.snapshots.Open(snapID)
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapID, err))
r.logger.Error("failed to open snapshot", "id", snapID, "error", err)
return false, err
}
defer snapshot.Close()
@ -314,7 +314,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
LastLogTerm: meta.Term,
Peers: meta.Peers,
Size: meta.Size,
Configuration: encodeConfiguration(meta.Configuration),
Configuration: EncodeConfiguration(meta.Configuration),
ConfigurationIndex: meta.ConfigurationIndex,
}
@ -322,7 +322,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
start := time.Now()
var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Error(fmt.Sprintf("Failed to install snapshot %v: %v", snapID, err))
r.logger.Error("failed to install snapshot", "id", snapID, "error", err)
s.failures++
return false, err
}
@ -350,7 +350,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
s.notifyAll(true)
} else {
s.failures++
r.logger.Warn(fmt.Sprintf("InstallSnapshot to %v rejected", s.peer))
r.logger.Warn("installSnapshot rejected to", "peer", s.peer)
}
return false, nil
}
@ -377,7 +377,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error(fmt.Sprintf("Failed to heartbeat to %v: %v", s.peer.Address, err))
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
@ -405,8 +405,8 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error {
defer pipeline.Close()
// Log start and stop of pipeline
r.logger.Info(fmt.Sprintf("pipelining replication to peer %v", s.peer))
defer r.logger.Info(fmt.Sprintf("aborting pipeline replication to peer %v", s.peer))
r.logger.Info("pipelining replication", "peer", s.peer)
defer r.logger.Info("aborting pipeline replication", "peer", s.peer)
// Create a shutdown and finish channel
stopCh := make(chan struct{})
@ -467,7 +467,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u
// Pipeline the append entries
if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil {
r.logger.Error(fmt.Sprintf("Failed to pipeline AppendEntries to %v: %v", s.peer, err))
r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err)
return true
}
@ -543,7 +543,7 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error
} else {
var l Log
if err := r.logs.GetLog(nextIndex-1, &l); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", nextIndex-1, err))
r.logger.Error("failed to get log", "index", nextIndex-1, "error", err)
return err
}
@ -562,7 +562,7 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
for i := nextIndex; i <= maxIndex; i++ {
oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", i, err))
r.logger.Error("failed to get log", "index", i, "error", err)
return err
}
req.Entries = append(req.Entries, oldLog)
@ -578,7 +578,7 @@ func appendStats(peer string, start time.Time, logs float32) {
// handleStaleTerm is used when a follower indicates that we have a stale term.
func (r *Raft) handleStaleTerm(s *followerReplication) {
r.logger.Error(fmt.Sprintf("peer %v has newer term, stopping replication", s.peer))
r.logger.Error("peer has newer term, stopping replication", "peer", s.peer)
s.notifyAll(false) // No longer leader
asyncNotifyCh(s.stepDown)
}

View File

@ -77,14 +77,14 @@ func (r *Raft) runSnapshots() {
// Trigger a snapshot
if _, err := r.takeSnapshot(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err))
r.logger.Error("failed to take snapshot", "error", err)
}
case future := <-r.userSnapshotCh:
// User-triggered, run immediately
id, err := r.takeSnapshot()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err))
r.logger.Error("failed to take snapshot", "error", err)
} else {
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
return r.snapshots.Open(id)
@ -107,7 +107,7 @@ func (r *Raft) shouldSnapshot() bool {
// Check the last log index
lastIdx, err := r.logs.LastIndex()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err))
r.logger.Error("failed to get last log index", "error", err)
return false
}
@ -172,7 +172,7 @@ func (r *Raft) takeSnapshot() (string, error) {
}
// Create a new snapshot.
r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index))
r.logger.Info("starting snapshot up to", "index", snapReq.index)
start := time.Now()
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
@ -202,7 +202,7 @@ func (r *Raft) takeSnapshot() (string, error) {
return "", err
}
r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index))
r.logger.Info("snapshot complete up to", "index", snapReq.index)
return sink.ID(), nil
}
@ -228,8 +228,12 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
// Log this
r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog))
if minLog > maxLog {
r.logger.Info("no logs to truncate")
return nil
}
r.logger.Info("compacting logs", "from", minLog, "to", maxLog)
// Compact the logs
if err := r.logs.DeleteRange(minLog, maxLog); err != nil {

View File

@ -2,8 +2,8 @@ package raft
import (
"errors"
"github.com/hashicorp/go-hclog"
"io"
"log"
"net"
"time"
)
@ -40,7 +40,7 @@ func NewTCPTransportWithLogger(
advertise net.Addr,
maxPool int,
timeout time.Duration,
logger *log.Logger,
logger hclog.Logger,
) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger)

View File

@ -5,7 +5,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"reflect"
"sync"
@ -16,6 +15,10 @@ import (
"github.com/hashicorp/go-msgpack/codec"
)
var (
userSnapshotErrorsOnNoData = true
)
// Return configurations optimized for in-memory
func inmemConfig(t *testing.T) *Config {
conf := DefaultConfig()
@ -151,12 +154,18 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
return len(d), nil
}
func newTestLogger(t *testing.T) *log.Logger {
return log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds)
func newTestLogger(t *testing.T) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t},
Level: hclog.DefaultLevel,
})
}
func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger {
return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds)
func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t, prefix: prefix},
Level: hclog.DefaultLevel,
})
}
func newTestLeveledLogger(t *testing.T) hclog.Logger {
@ -185,7 +194,7 @@ type cluster struct {
conf *Config
propagateTimeout time.Duration
longstopTimeout time.Duration
logger *log.Logger
logger hclog.Logger
startTime time.Time
failedLock sync.Mutex
@ -222,7 +231,7 @@ func (c *cluster) notifyFailed() {
// thread to block until all goroutines have completed in order to reliably
// fail tests using this function.
func (c *cluster) Failf(format string, args ...interface{}) {
c.logger.Printf(format, args...)
c.logger.Error(fmt.Sprintf(format, args...))
c.t.Fail()
c.notifyFailed()
}
@ -233,7 +242,7 @@ func (c *cluster) Failf(format string, args ...interface{}) {
// other goroutines created during the test. Calling FailNowf does not stop
// those other goroutines.
func (c *cluster) FailNowf(format string, args ...interface{}) {
c.logger.Printf(format, args...)
c.logger.Error(fmt.Sprintf(format, args...))
c.t.FailNow()
}
@ -254,7 +263,7 @@ func (c *cluster) Close() {
for _, f := range futures {
if err := f.Error(); err != nil {
c.FailNowf("[ERR] shutdown future err: %v", err)
c.FailNowf("shutdown future err: %v", err)
}
}
@ -316,7 +325,7 @@ CHECK:
c.t.FailNow()
case <-limitCh:
c.FailNowf("[ERR] Timeout waiting for replication")
c.FailNowf("timeout waiting for replication")
case <-ch:
for _, fsmRaw := range c.fsms {
@ -354,7 +363,7 @@ func (c *cluster) pollState(s RaftState) ([]*Raft, uint64) {
// GetInState polls the state of the cluster and attempts to identify when it has
// settled into the given state.
func (c *cluster) GetInState(s RaftState) []*Raft {
c.logger.Printf("[INFO] Starting stability test for raft state: %+v", s)
c.logger.Info("starting stability test", "raft-state", s)
limitCh := time.After(c.longstopTimeout)
// An election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout)
@ -411,17 +420,18 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.FailNow()
case <-limitCh:
c.FailNowf("[ERR] Timeout waiting for stable %s state", s)
c.FailNowf("timeout waiting for stable %s state", s)
case <-c.WaitEventChan(filter, 0):
c.logger.Printf("[DEBUG] Resetting stability timeout")
c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C:
if !ok {
c.FailNowf("[ERR] Timer channel errored")
c.FailNowf("timer channel errored")
}
c.logger.Printf("[INFO] Stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))
c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
return inState
}
}
@ -431,7 +441,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
func (c *cluster) Leader() *Raft {
leaders := c.GetInState(Leader)
if len(leaders) != 1 {
c.FailNowf("[ERR] expected one leader: %v", leaders)
c.FailNowf("expected one leader: %v", leaders)
}
return leaders[0]
}
@ -442,14 +452,14 @@ func (c *cluster) Followers() []*Raft {
expFollowers := len(c.rafts) - 1
followers := c.GetInState(Follower)
if len(followers) != expFollowers {
c.FailNowf("[ERR] timeout waiting for %d followers (followers are %v)", expFollowers, followers)
c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers)
}
return followers
}
// FullyConnect connects all the transports together.
func (c *cluster) FullyConnect() {
c.logger.Printf("[DEBUG] Fully Connecting")
c.logger.Debug("fully connecting")
for i, t1 := range c.trans {
for j, t2 := range c.trans {
if i != j {
@ -462,7 +472,7 @@ func (c *cluster) FullyConnect() {
// Disconnect disconnects all transports from the given address.
func (c *cluster) Disconnect(a ServerAddress) {
c.logger.Printf("[DEBUG] Disconnecting %v", a)
c.logger.Debug("disconnecting", "address", a)
for _, t := range c.trans {
if t.LocalAddr() == a {
t.DisconnectAll()
@ -475,7 +485,7 @@ func (c *cluster) Disconnect(a ServerAddress) {
// Partition keeps the given list of addresses connected but isolates them
// from the other members of the cluster.
func (c *cluster) Partition(far []ServerAddress) {
c.logger.Printf("[DEBUG] Partitioning %v", far)
c.logger.Debug("partitioning", "addresses", far)
// Gather the set of nodes on the "near" side of the partition (we
// will call the supplied list of nodes the "far" side).
@ -500,7 +510,7 @@ OUTER:
t.Disconnect(a)
}
} else {
for a, _ := range near {
for a := range near {
t.Disconnect(a)
}
}
@ -530,15 +540,15 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) {
leader = "[none]"
}
if expect == "" {
c.logger.Printf("[ERR] Peer %s sees leader %v expected [none]", r, leader)
c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]")
} else {
c.logger.Printf("[ERR] Peer %s sees leader %v expected %v", r, leader, expect)
c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect)
}
fail = true
}
}
if fail {
c.FailNowf("[ERR] At least one peer has the wrong notion of leader")
c.FailNowf("at least one peer has the wrong notion of leader")
}
}
@ -559,7 +569,7 @@ CHECK:
if len(first.logs) != len(fsm.logs) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("[ERR] FSM log length mismatch: %d %d",
c.FailNowf("FSM log length mismatch: %d %d",
len(first.logs), len(fsm.logs))
} else {
goto WAIT
@ -570,7 +580,7 @@ CHECK:
if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("[ERR] FSM log mismatch at index %d", idx)
c.FailNowf("FSM log mismatch at index %d", idx)
} else {
goto WAIT
}
@ -579,7 +589,7 @@ CHECK:
if len(first.configurations) != len(fsm.configurations) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("[ERR] FSM configuration length mismatch: %d %d",
c.FailNowf("FSM configuration length mismatch: %d %d",
len(first.logs), len(fsm.logs))
} else {
goto WAIT
@ -590,7 +600,7 @@ CHECK:
if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("[ERR] FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
} else {
goto WAIT
}
@ -613,7 +623,7 @@ WAIT:
func (c *cluster) getConfiguration(r *Raft) Configuration {
future := r.GetConfiguration()
if err := future.Error(); err != nil {
c.FailNowf("[ERR] failed to get configuration: %v", err)
c.FailNowf("failed to get configuration: %v", err)
return Configuration{}
}
@ -634,7 +644,7 @@ CHECK:
otherSet := c.getConfiguration(raft)
if !reflect.DeepEqual(peerSet, otherSet) {
if time.Now().After(limit) {
c.FailNowf("[ERR] peer mismatch: %+v %+v", peerSet, otherSet)
c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet)
} else {
goto WAIT
}
@ -687,7 +697,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
for i := 0; i < opts.Peers; i++ {
dir, err := ioutil.TempDir("", "raft")
if err != nil {
c.FailNowf("[ERR] err: %v ", err)
c.FailNowf("err: %v", err)
}
store := NewInmemStore()
@ -742,18 +752,18 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
if opts.Bootstrap {
err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration)
if err != nil {
c.FailNowf("[ERR] BootstrapCluster failed: %v", err)
c.FailNowf("BootstrapCluster failed: %v", err)
}
}
raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans)
if err != nil {
c.FailNowf("[ERR] NewRaft failed: %v", err)
c.FailNowf("NewRaft failed: %v", err)
}
raft.RegisterObserver(NewObserver(c.observationCh, false, nil))
if err != nil {
c.FailNowf("[ERR] RegisterObserver failed: %v", err)
c.FailNowf("RegisterObserver failed: %v", err)
}
c.rafts = append(c.rafts, raft)
}

29
vendor/github.com/hashicorp/raft/testing_batch.go generated vendored Normal file
View File

@ -0,0 +1,29 @@
// +build batchtest
package raft
func init() {
userSnapshotErrorsOnNoData = false
}
// ApplyBatch enables MockFSM to satisfy the BatchingFSM interface. This
// function is gated by the batchtest build flag.
//
// NOTE: This is exposed for middleware testing purposes and is not a stable API
func (m *MockFSM) ApplyBatch(logs []*Log) []interface{} {
m.Lock()
defer m.Unlock()
ret := make([]interface{}, len(logs))
for i, log := range logs {
switch log.Type {
case LogCommand:
m.logs = append(m.logs, log.Data)
ret[i] = len(m.logs)
default:
ret[i] = nil
}
}
return ret
}

View File

@ -13,7 +13,7 @@ import (
)
func init() {
// Ensure we use a high-entropy seed for the psuedo-random generator
// Ensure we use a high-entropy seed for the pseudo-random generator
rand.Seed(newSeed())
}

2
vendor/modules.txt vendored
View File

@ -245,7 +245,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.1.1
# github.com/hashicorp/raft v1.1.2
github.com/hashicorp/raft
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/raft-boltdb