From 9c1361c02b8625d613c46c39796d6ed5ba7367a8 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Mon, 20 Jan 2020 13:58:02 +0100 Subject: [PATCH] raft: update raft to v1.1.2 (#7079) * update raft * use hclogger for raft. --- agent/consul/server.go | 15 +- go.mod | 2 +- go.sum | 6 +- .../hashicorp/raft/.golangci-lint.yml | 49 +++ vendor/github.com/hashicorp/raft/.travis.yml | 5 +- vendor/github.com/hashicorp/raft/CHANGELOG.md | 19 +- vendor/github.com/hashicorp/raft/Makefile | 35 +- vendor/github.com/hashicorp/raft/api.go | 128 ++++--- vendor/github.com/hashicorp/raft/commands.go | 16 +- vendor/github.com/hashicorp/raft/config.go | 23 +- .../hashicorp/raft/configuration.go | 8 +- .../hashicorp/raft/discard_snapshot.go | 15 + .../hashicorp/raft/file_snapshot.go | 77 +++-- vendor/github.com/hashicorp/raft/fsm.go | 95 +++++- vendor/github.com/hashicorp/raft/future.go | 13 +- .../hashicorp/raft/inmem_snapshot.go | 2 + .../hashicorp/raft/inmem_transport.go | 13 +- vendor/github.com/hashicorp/raft/log.go | 4 +- .../hashicorp/raft/net_transport.go | 33 +- vendor/github.com/hashicorp/raft/observer.go | 2 +- vendor/github.com/hashicorp/raft/raft.go | 320 +++++++++++------- .../github.com/hashicorp/raft/replication.go | 34 +- vendor/github.com/hashicorp/raft/snapshot.go | 18 +- .../hashicorp/raft/tcp_transport.go | 4 +- vendor/github.com/hashicorp/raft/testing.go | 80 +++-- .../hashicorp/raft/testing_batch.go | 29 ++ vendor/github.com/hashicorp/raft/util.go | 2 +- vendor/modules.txt | 2 +- 28 files changed, 700 insertions(+), 349 deletions(-) create mode 100644 vendor/github.com/hashicorp/raft/.golangci-lint.yml create mode 100644 vendor/github.com/hashicorp/raft/testing_batch.go diff --git a/agent/consul/server.go b/agent/consul/server.go index 834abc2bca..25e339ab7c 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -580,13 +580,20 @@ func (s *Server) setupRaft() error { serverAddressProvider = s.serverLookup } + raftLogger := hclog.New(&hclog.LoggerOptions{ + Name: "raft", + Level: hclog.LevelFromString(s.config.LogLevel), + Output: s.config.LogOutput, + TimeFormat: `2006/01/02 15:04:05`, + }) + // Create a transport layer. transConfig := &raft.NetworkTransportConfig{ Stream: s.raftLayer, MaxPool: 3, Timeout: 10 * time.Second, ServerAddressProvider: serverAddressProvider, - Logger: s.logger, + Logger: raftLogger, } trans := raft.NewNetworkTransportWithConfig(transConfig) @@ -594,12 +601,6 @@ func (s *Server) setupRaft() error { // Make sure we set the LogOutput. s.config.RaftConfig.LogOutput = s.config.LogOutput - raftLogger := hclog.New(&hclog.LoggerOptions{ - Name: "raft", - Level: hclog.LevelFromString(s.config.LogLevel), - Output: s.config.LogOutput, - TimeFormat: `2006/01/02 15:04:05`, - }) s.config.RaftConfig.Logger = raftLogger // Versions of the Raft protocol below 3 require the LocalID to match the network diff --git a/go.mod b/go.mod index e7c603c5d8..113978b65b 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/hashicorp/mdns v1.0.1 // indirect github.com/hashicorp/memberlist v0.1.5 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 - github.com/hashicorp/raft v1.1.1 + github.com/hashicorp/raft v1.1.2 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/serf v0.8.5 github.com/hashicorp/vault/api v1.0.4 diff --git a/go.sum b/go.sum index bd38a33d19..ac0b3e4bf2 100644 --- a/go.sum +++ b/go.sum @@ -187,6 +187,8 @@ github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= +github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= @@ -349,8 +351,6 @@ github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59b golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20191106202628-ed6320f186d4 h1:PDpCLFAH/YIX0QpHPf2eO7L4rC2OOirBrKtXTLLiNTY= @@ -398,8 +398,6 @@ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c h1:hDn6jm7snBX2O7+EeTk6Q4WXJ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/vendor/github.com/hashicorp/raft/.golangci-lint.yml b/vendor/github.com/hashicorp/raft/.golangci-lint.yml new file mode 100644 index 0000000000..a021e196ee --- /dev/null +++ b/vendor/github.com/hashicorp/raft/.golangci-lint.yml @@ -0,0 +1,49 @@ +run: + deadline: 5m + +linters-settings: + govet: + check-shadowing: true + golint: + min-confidence: 0 + +linters: + disable-all: true + enable: + - gofmt + #- golint + - govet + #- varcheck + #- typecheck + #- gosimple + +issues: + exclude-use-default: false + exclude: + # ignore the false positive erros resulting from not including a comment above every `package` keyword + - should have a package comment, unless it's in another file for this package (golint) + # golint: Annoying issue about not having a comment. The rare codebase has such comments + # - (comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form) + # errcheck: Almost all programs ignore errors on these functions and in most cases it's ok + - Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*printf?|os\.(Un)?Setenv). is not checked + + # golint: False positive when tests are defined in package 'test' + - func name will be used as test\.Test.* by other packages, and that stutters; consider calling this + + # staticcheck: Developers tend to write in C-style with an + # explicit 'break' in a 'switch', so it's ok to ignore + - ineffective break statement. Did you mean to break out of the outer loop + # gosec: Too many false-positives on 'unsafe' usage + - Use of unsafe calls should be audited + + # gosec: Too many false-positives for parametrized shell calls + - Subprocess launch(ed with variable|ing should be audited) + + # gosec: Duplicated errcheck checks + - G104 + + # gosec: Too many issues in popular repos + - (Expect directory permissions to be 0750 or less|Expect file permissions to be 0600 or less) + + # gosec: False positive is triggered by 'src, err := ioutil.ReadFile(filename)' + - Potential file inclusion via variable diff --git a/vendor/github.com/hashicorp/raft/.travis.yml b/vendor/github.com/hashicorp/raft/.travis.yml index faeb11ffe0..badd7ff92e 100644 --- a/vendor/github.com/hashicorp/raft/.travis.yml +++ b/vendor/github.com/hashicorp/raft/.travis.yml @@ -8,7 +8,10 @@ go: - 1.12 - tip -install: make deps +install: + - make deps + - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin latest + script: - make integ diff --git a/vendor/github.com/hashicorp/raft/CHANGELOG.md b/vendor/github.com/hashicorp/raft/CHANGELOG.md index efd56b0fed..a2c3a0ac82 100644 --- a/vendor/github.com/hashicorp/raft/CHANGELOG.md +++ b/vendor/github.com/hashicorp/raft/CHANGELOG.md @@ -1,4 +1,21 @@ -# UNRELEASED +# 1.1.2 (January 17th, 2020) + +FEATURES + +* Improve FSM apply performance through batching. Implementing the `BatchingFSM` interface enables this new feature [[GH-364](https://github.com/hashicorp/raft/pull/364)] +* Add ability to obtain Raft configuration before Raft starts with GetConfiguration [[GH-369](https://github.com/hashicorp/raft/pull/369)] + +IMPROVEMENTS + +* Remove lint violations and add a `make` rule for running the linter. +* Replace logger with hclog [[GH-360](https://github.com/hashicorp/raft/pull/360)] +* Read latest configuration independently from main loop [[GH-379](https://github.com/hashicorp/raft/pull/379)] + +BUG FIXES + +* Export the leader field in LeaderObservation [[GH-357](https://github.com/hashicorp/raft/pull/357)] +* Fix snapshot to not attempt to truncate a negative range [[GH-358](https://github.com/hashicorp/raft/pull/358)] +* Check for shutdown in inmemPipeline before sending RPCs [[GH-276](https://github.com/hashicorp/raft/pull/276)] # 1.1.1 (July 23rd, 2019) diff --git a/vendor/github.com/hashicorp/raft/Makefile b/vendor/github.com/hashicorp/raft/Makefile index b4ef1a94df..8ce7c06ab1 100644 --- a/vendor/github.com/hashicorp/raft/Makefile +++ b/vendor/github.com/hashicorp/raft/Makefile @@ -1,30 +1,57 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) +ENV = $(shell go env GOPATH) +GO_VERSION = $(shell go version) +GOLANG_CI_VERSION = v1.19.0 + +# Look for versions prior to 1.10 which have a different fmt output +# and don't lint with gofmt against them. +ifneq (,$(findstring go version go1.8, $(GO_VERSION))) + FMT= +else ifneq (,$(findstring go version go1.9, $(GO_VERSION))) + FMT= +else + FMT=--enable gofmt +endif + TEST_RESULTS_DIR?=/tmp/test-results test: - go test -timeout=60s -race . + go test $(TESTARGS) -timeout=60s -race . + go test $(TESTARGS) -timeout=60s -tags batchtest -race . integ: test - INTEG_TESTS=yes go test -timeout=25s -run=Integ . + INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ . + INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ . ci.test-norace: gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s + gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest ci.test: gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race . + gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest . ci.integ: ci.test INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ . + INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest . fuzz: - go test -timeout=300s ./fuzzy + go test $(TESTARGS) -timeout=20m ./fuzzy + go test $(TESTARGS) -timeout=20m -tags batchtest ./fuzzy deps: go get -t -d -v ./... echo $(DEPS) | xargs -n1 go get -d +lint: + gofmt -s -w . + golangci-lint run -c .golangci-lint.yml $(FMT) . + +dep-linter: + curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(ENV)/bin $(GOLANG_CI_VERSION) + cov: INTEG_TESTS=yes gocov test github.com/hashicorp/raft | gocov-html > /tmp/coverage.html open /tmp/coverage.html -.PHONY: test cov integ deps +.PHONY: test cov integ deps dep-linter lint diff --git a/vendor/github.com/hashicorp/raft/api.go b/vendor/github.com/hashicorp/raft/api.go index 9c61fefbee..03f43601d4 100644 --- a/vendor/github.com/hashicorp/raft/api.go +++ b/vendor/github.com/hashicorp/raft/api.go @@ -7,11 +7,11 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" - "github.com/hashicorp/go-hclog" - - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" + hclog "github.com/hashicorp/go-hclog" ) const ( @@ -139,6 +139,10 @@ type Raft struct { // the log/snapshot. configurations configurations + // Holds a copy of the latest configuration which can be read + // independently from main loop. + latestConfiguration atomic.Value + // RPC chan comes from the transport layer rpcCh <-chan RPC @@ -234,7 +238,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, entry.Data = encodePeers(configuration, trans) } else { entry.Type = LogConfiguration - entry.Data = encodeConfiguration(configuration) + entry.Data = EncodeConfiguration(configuration) } if err := logs.StoreLog(entry); err != nil { return fmt.Errorf("failed to append configuration entry to log: %v", err) @@ -288,37 +292,36 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, // expect data to be there and it's not. By refusing, we force them // to show intent to start a cluster fresh by explicitly doing a // bootstrap, rather than quietly fire up a fresh cluster here. - hasState, err := HasExistingState(logs, stable, snaps) - if err != nil { + if hasState, err := HasExistingState(logs, stable, snaps); err != nil { return fmt.Errorf("failed to check for existing state: %v", err) - } - if !hasState { + } else if !hasState { return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error") } // Attempt to restore any snapshots we find, newest to oldest. - var snapshotIndex uint64 - var snapshotTerm uint64 - snapshots, err := snaps.List() + var ( + snapshotIndex uint64 + snapshotTerm uint64 + snapshots, err = snaps.List() + ) if err != nil { return fmt.Errorf("failed to list snapshots: %v", err) } for _, snapshot := range snapshots { - if !conf.NoSnapshotRestoreOnStart { - _, source, err := snaps.Open(snapshot.ID) - if err != nil { - // Skip this one and try the next. We will detect if we - // couldn't open any snapshots. - continue - } + var source io.ReadCloser + _, source, err = snaps.Open(snapshot.ID) + if err != nil { + // Skip this one and try the next. We will detect if we + // couldn't open any snapshots. + continue + } - err = fsm.Restore(source) - // Close the source after the restore has completed - source.Close() - if err != nil { - // Same here, skip and try the next one. - continue - } + err = fsm.Restore(source) + // Close the source after the restore has completed + source.Close() + if err != nil { + // Same here, skip and try the next one. + continue } snapshotIndex = snapshot.Index @@ -341,7 +344,7 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, } for index := snapshotIndex + 1; index <= lastLogIndex; index++ { var entry Log - if err := logs.GetLog(index, &entry); err != nil { + if err = logs.GetLog(index, &entry); err != nil { return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == LogCommand { @@ -362,10 +365,10 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, if err != nil { return fmt.Errorf("failed to create snapshot: %v", err) } - if err := snapshot.Persist(sink); err != nil { + if err = snapshot.Persist(sink); err != nil { return fmt.Errorf("failed to persist snapshot: %v", err) } - if err := sink.Close(); err != nil { + if err = sink.Close(); err != nil { return fmt.Errorf("failed to finalize snapshot: %v", err) } @@ -382,6 +385,23 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, return nil } +// GetConfiguration returns the configuration of the Raft cluster without +// starting a Raft instance or connecting to the cluster +// This function has identical behavior to Raft.GetConfiguration +func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore, + snaps SnapshotStore, trans Transport) (Configuration, error) { + conf.skipStartup = true + r, err := NewRaft(conf, fsm, logs, stable, snaps, trans) + if err != nil { + return Configuration{}, err + } + future := r.GetConfiguration() + if err = future.Error(); err != nil { + return Configuration{}, err + } + return future.Configuration(), nil +} + // HasExistingState returns true if the server has any existing state (logs, // knowledge of a current term, or any snapshots). func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) { @@ -528,19 +548,23 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna for index := snapshotIndex + 1; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", index, err)) + r.logger.Error("failed to get log", "index", index, "error", err) panic(err) } r.processConfigurationLogEntry(&entry) } - r.logger.Info(fmt.Sprintf("Initial configuration (index=%d): %+v", - r.configurations.latestIndex, r.configurations.latest.Servers)) + r.logger.Info("initial configuration", + "index", r.configurations.latestIndex, + "servers", hclog.Fmt("%+v", r.configurations.latest.Servers)) // Setup a heartbeat fast-path to avoid head-of-line // blocking where possible. It MUST be safe for this // to be called concurrently with a blocking RPC. trans.SetHeartbeatHandler(r.processHeartbeat) + if conf.skipStartup { + return r, nil + } // Start the background work. r.goFunc(r.run) r.goFunc(r.runFSM) @@ -554,7 +578,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna func (r *Raft) restoreSnapshot() error { snapshots, err := r.snapshots.List() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) + r.logger.Error("failed to list snapshots", "error", err) return err } @@ -563,7 +587,7 @@ func (r *Raft) restoreSnapshot() error { if !r.conf.NoSnapshotRestoreOnStart { _, source, err := r.snapshots.Open(snapshot.ID) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapshot.ID, err)) + r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err) continue } @@ -571,11 +595,11 @@ func (r *Raft) restoreSnapshot() error { // Close the source after the restore has completed source.Close() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to restore snapshot %v: %v", snapshot.ID, err)) + r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err) continue } - r.logger.Info(fmt.Sprintf("Restored from snapshot %v", snapshot.ID)) + r.logger.Info("restored from snapshot", "id", snapshot.ID) } // Update the lastApplied so we don't replay old logs r.setLastApplied(snapshot.Index) @@ -584,18 +608,17 @@ func (r *Raft) restoreSnapshot() error { r.setLastSnapshot(snapshot.Index, snapshot.Term) // Update the configuration + var conf Configuration + var index uint64 if snapshot.Version > 0 { - r.configurations.committed = snapshot.Configuration - r.configurations.committedIndex = snapshot.ConfigurationIndex - r.configurations.latest = snapshot.Configuration - r.configurations.latestIndex = snapshot.ConfigurationIndex + conf = snapshot.Configuration + index = snapshot.ConfigurationIndex } else { - configuration := decodePeers(snapshot.Peers, r.trans) - r.configurations.committed = configuration - r.configurations.committedIndex = snapshot.Index - r.configurations.latest = configuration - r.configurations.latestIndex = snapshot.Index + conf = decodePeers(snapshot.Peers, r.trans) + index = snapshot.Index } + r.setCommittedConfiguration(conf, index) + r.setLatestConfiguration(conf, index) // Success! return nil @@ -727,19 +750,14 @@ func (r *Raft) VerifyLeader() Future { } } -// GetConfiguration returns the latest configuration and its associated index -// currently in use. This may not yet be committed. This must not be called on -// the main thread (which can access the information directly). +// GetConfiguration returns the latest configuration. This may not yet be +// committed. The main loop can access this directly. func (r *Raft) GetConfiguration() ConfigurationFuture { configReq := &configurationsFuture{} configReq.init() - select { - case <-r.shutdownCh: - configReq.respond(ErrRaftShutdown) - return configReq - case r.configurationsCh <- configReq: - return configReq - } + configReq.configurations = configurations{latest: r.getLatestConfiguration()} + configReq.respond(nil) + return configReq } // AddPeer (deprecated) is used to add a new peer into the cluster. This must be @@ -1013,7 +1031,7 @@ func (r *Raft) Stats() map[string]string { future := r.GetConfiguration() if err := future.Error(); err != nil { - r.logger.Warn(fmt.Sprintf("could not get configuration for Stats: %v", err)) + r.logger.Warn("could not get configuration for stats", "error", err) } else { configuration := future.Configuration() s["latest_configuration_index"] = toString(future.Index()) diff --git a/vendor/github.com/hashicorp/raft/commands.go b/vendor/github.com/hashicorp/raft/commands.go index 17416311d8..3358a32847 100644 --- a/vendor/github.com/hashicorp/raft/commands.go +++ b/vendor/github.com/hashicorp/raft/commands.go @@ -35,7 +35,7 @@ type AppendEntriesRequest struct { LeaderCommitIndex uint64 } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -59,7 +59,7 @@ type AppendEntriesResponse struct { NoRetryBackoff bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -83,7 +83,7 @@ type RequestVoteRequest struct { LeadershipTransfer bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *RequestVoteRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -104,7 +104,7 @@ type RequestVoteResponse struct { Granted bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *RequestVoteResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -136,7 +136,7 @@ type InstallSnapshotRequest struct { Size int64 } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -150,7 +150,7 @@ type InstallSnapshotResponse struct { Success bool } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -161,7 +161,7 @@ type TimeoutNowRequest struct { RPCHeader } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader { return r.RPCHeader } @@ -171,7 +171,7 @@ type TimeoutNowResponse struct { RPCHeader } -// See WithRPCHeader. +// GetRPCHeader - See WithRPCHeader. func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } diff --git a/vendor/github.com/hashicorp/raft/config.go b/vendor/github.com/hashicorp/raft/config.go index e43ba54496..3ec564df44 100644 --- a/vendor/github.com/hashicorp/raft/config.go +++ b/vendor/github.com/hashicorp/raft/config.go @@ -8,8 +8,8 @@ import ( "github.com/hashicorp/go-hclog" ) -// These are the versions of the protocol (which includes RPC messages as -// well as Raft-specific log entries) that this server can _understand_. Use +// ProtocolVersion is the version of the protocol (which includes RPC messages +// as well as Raft-specific log entries) that this server can _understand_. Use // the ProtocolVersion member of the Config object to control the version of // the protocol to use when _speaking_ to other servers. Note that depending on // the protocol version being spoken, some otherwise understood RPC messages @@ -88,13 +88,15 @@ import ( type ProtocolVersion int const ( + // ProtocolVersionMin is the minimum protocol version ProtocolVersionMin ProtocolVersion = 0 - ProtocolVersionMax = 3 + // ProtocolVersionMax is the maximum protocol version + ProtocolVersionMax = 3 ) -// These are versions of snapshots that this server can _understand_. Currently, -// it is always assumed that this server generates the latest version, though -// this may be changed in the future to include a configurable version. +// SnapshotVersion is the version of snapshots that this server can understand. +// Currently, it is always assumed that the server generates the latest version, +// though this may be changed in the future to include a configurable version. // // Version History // @@ -112,8 +114,10 @@ const ( type SnapshotVersion int const ( + // SnapshotVersionMin is the minimum snapshot version SnapshotVersionMin SnapshotVersion = 0 - SnapshotVersionMax = 1 + // SnapshotVersionMax is the maximum snapshot version + SnapshotVersionMax = 1 ) // Config provides any necessary configuration for the Raft server. @@ -202,10 +206,13 @@ type Config struct { // NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the // FSM on start. This is useful if your FSM recovers from other mechanisms - // than raft snapshotting. Snapshot metadata will still be used to initalize + // than raft snapshotting. Snapshot metadata will still be used to initialize // raft's configuration and index values. This is used in NewRaft and // RestoreCluster. NoSnapshotRestoreOnStart bool + + // skipStartup allows NewRaft() to bypass all background work goroutines + skipStartup bool } // DefaultConfig returns a Config with usable defaults. diff --git a/vendor/github.com/hashicorp/raft/configuration.go b/vendor/github.com/hashicorp/raft/configuration.go index 4902fb1e97..bf19997b65 100644 --- a/vendor/github.com/hashicorp/raft/configuration.go +++ b/vendor/github.com/hashicorp/raft/configuration.go @@ -342,9 +342,9 @@ func decodePeers(buf []byte, trans Transport) Configuration { } } -// encodeConfiguration serializes a Configuration using MsgPack, or panics on +// EncodeConfiguration serializes a Configuration using MsgPack, or panics on // errors. -func encodeConfiguration(configuration Configuration) []byte { +func EncodeConfiguration(configuration Configuration) []byte { buf, err := encodeMsgPack(configuration) if err != nil { panic(fmt.Errorf("failed to encode configuration: %v", err)) @@ -352,9 +352,9 @@ func encodeConfiguration(configuration Configuration) []byte { return buf.Bytes() } -// decodeConfiguration deserializes a Configuration using MsgPack, or panics on +// DecodeConfiguration deserializes a Configuration using MsgPack, or panics on // errors. -func decodeConfiguration(buf []byte) Configuration { +func DecodeConfiguration(buf []byte) Configuration { var configuration Configuration if err := decodeMsgPack(buf, &configuration); err != nil { panic(fmt.Errorf("failed to decode configuration: %v", err)) diff --git a/vendor/github.com/hashicorp/raft/discard_snapshot.go b/vendor/github.com/hashicorp/raft/discard_snapshot.go index 5e93a9fe01..fb15d4d3ea 100644 --- a/vendor/github.com/hashicorp/raft/discard_snapshot.go +++ b/vendor/github.com/hashicorp/raft/discard_snapshot.go @@ -12,6 +12,11 @@ import ( // suitable for testing. type DiscardSnapshotStore struct{} +// DiscardSnapshotSink is used to fulfill the SnapshotSink interface +// while always discarding the . This is useful for when the log +// should be truncated but no snapshot should be retained. This +// should never be used for production use, and is only suitable +// for testing. type DiscardSnapshotSink struct{} // NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore. @@ -19,31 +24,41 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore { return &DiscardSnapshotStore{} } +// Create returns a valid type implementing the SnapshotSink which +// always discards the snapshot. func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) { return &DiscardSnapshotSink{}, nil } +// List returns successfully with a nil for []*SnapshotMeta. func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) { return nil, nil } +// Open returns an error since the DiscardSnapshotStore does not +// support opening snapshots. func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) { return nil, nil, fmt.Errorf("open is not supported") } +// Write returns successfully with the length of the input byte slice +// to satisfy the WriteCloser interface func (d *DiscardSnapshotSink) Write(b []byte) (int, error) { return len(b), nil } +// Close returns a nil error func (d *DiscardSnapshotSink) Close() error { return nil } +// ID returns "discard" for DiscardSnapshotSink func (d *DiscardSnapshotSink) ID() string { return "discard" } +// Cancel returns successfully with a nil error func (d *DiscardSnapshotSink) Cancel() error { return nil } diff --git a/vendor/github.com/hashicorp/raft/file_snapshot.go b/vendor/github.com/hashicorp/raft/file_snapshot.go index ffc9414542..d1604493e0 100644 --- a/vendor/github.com/hashicorp/raft/file_snapshot.go +++ b/vendor/github.com/hashicorp/raft/file_snapshot.go @@ -5,11 +5,11 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/hashicorp/go-hclog" "hash" "hash/crc64" "io" "io/ioutil" - "log" "os" "path/filepath" "runtime" @@ -31,7 +31,7 @@ const ( type FileSnapshotStore struct { path string retain int - logger *log.Logger + logger hclog.Logger } type snapMetaSlice []*fileSnapshotMeta @@ -39,7 +39,7 @@ type snapMetaSlice []*fileSnapshotMeta // FileSnapshotSink implements SnapshotSink with a file. type FileSnapshotSink struct { store *FileSnapshotStore - logger *log.Logger + logger hclog.Logger dir string parentDir string meta fileSnapshotMeta @@ -76,12 +76,16 @@ func (b *bufferedFile) Close() error { // NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based // on a base directory. The `retain` parameter controls how many // snapshots are retained. Must be at least 1. -func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error) { +func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) { if retain < 1 { return nil, fmt.Errorf("must retain at least one snapshot") } if logger == nil { - logger = log.New(os.Stderr, "", log.LstdFlags) + logger = hclog.New(&hclog.LoggerOptions{ + Name: "snapshot", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + }) } // Ensure our path exists @@ -111,7 +115,11 @@ func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSn if logOutput == nil { logOutput = os.Stderr } - return NewFileSnapshotStoreWithLogger(base, retain, log.New(logOutput, "", log.LstdFlags)) + return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{ + Name: "snapshot", + Output: logOutput, + Level: hclog.DefaultLevel, + })) } // testPermissions tries to touch a file in our path to see if it works. @@ -150,11 +158,11 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, // Create a new path name := snapshotName(term, index) path := filepath.Join(f.path, name+tmpSuffix) - f.logger.Printf("[INFO] snapshot: Creating new snapshot at %s", path) + f.logger.Info("creating new snapshot", "path", path) // Make the directory if err := os.MkdirAll(path, 0755); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to make snapshot directory: %v", err) + f.logger.Error("failed to make snapshot directly", "error", err) return nil, err } @@ -180,7 +188,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, // Write out the meta data if err := sink.writeMeta(); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) + f.logger.Error("failed to write metadata", "error", err) return nil, err } @@ -188,7 +196,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, statePath := filepath.Join(path, stateFilePath) fh, err := os.Create(statePath) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to create state file: %v", err) + f.logger.Error("failed to create state file", "error", err) return nil, err } sink.stateFile = fh @@ -209,7 +217,7 @@ func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) { // Get the eligible snapshots snapshots, err := f.getSnapshots() if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) + f.logger.Error("failed to get snapshots", "error", err) return nil, err } @@ -228,7 +236,7 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) { // Get the eligible snapshots snapshots, err := ioutil.ReadDir(f.path) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to scan snapshot dir: %v", err) + f.logger.Error("failed to scan snapshot directory", "error", err) return nil, err } @@ -243,20 +251,20 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) { // Ignore any temporary snapshots dirName := snap.Name() if strings.HasSuffix(dirName, tmpSuffix) { - f.logger.Printf("[WARN] snapshot: Found temporary snapshot: %v", dirName) + f.logger.Warn("found temporary snapshot", "name", dirName) continue } // Try to read the meta data meta, err := f.readMeta(dirName) if err != nil { - f.logger.Printf("[WARN] snapshot: Failed to read metadata for %v: %v", dirName, err) + f.logger.Warn("failed to read metadata", "name", dirName, "error", err) continue } // Make sure we can understand this version. if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax { - f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version) + f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version) continue } @@ -297,7 +305,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Get the metadata meta, err := f.readMeta(id) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get meta data to open snapshot: %v", err) + f.logger.Error("failed to get meta data to open snapshot", "error", err) return nil, nil, err } @@ -305,7 +313,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error statePath := filepath.Join(f.path, id, stateFilePath) fh, err := os.Open(statePath) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to open state file: %v", err) + f.logger.Error("failed to open state file", "error", err) return nil, nil, err } @@ -315,7 +323,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Compute the hash _, err = io.Copy(stateHash, fh) if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to read state file: %v", err) + f.logger.Error("failed to read state file", "error", err) fh.Close() return nil, nil, err } @@ -323,15 +331,14 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error // Verify the hash computed := stateHash.Sum(nil) if bytes.Compare(meta.CRC, computed) != 0 { - f.logger.Printf("[ERR] snapshot: CRC checksum failed (stored: %v computed: %v)", - meta.CRC, computed) + f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed) fh.Close() return nil, nil, fmt.Errorf("CRC mismatch") } // Seek to the start if _, err := fh.Seek(0, 0); err != nil { - f.logger.Printf("[ERR] snapshot: State file seek failed: %v", err) + f.logger.Error("state file seek failed", "error", err) fh.Close() return nil, nil, err } @@ -349,15 +356,15 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error func (f *FileSnapshotStore) ReapSnapshots() error { snapshots, err := f.getSnapshots() if err != nil { - f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) + f.logger.Error("failed to get snapshots", "error", err) return err } for i := f.retain; i < len(snapshots); i++ { path := filepath.Join(f.path, snapshots[i].ID) - f.logger.Printf("[INFO] snapshot: reaping snapshot %v", path) + f.logger.Info("reaping snapshot", "path", path) if err := os.RemoveAll(path); err != nil { - f.logger.Printf("[ERR] snapshot: Failed to reap snapshot %v: %v", path, err) + f.logger.Error("failed to reap snapshot", "path", path, "error", err) return err } } @@ -386,9 +393,9 @@ func (s *FileSnapshotSink) Close() error { // Close the open handles if err := s.finalize(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) + s.logger.Error("failed to finalize snapshot", "error", err) if delErr := os.RemoveAll(s.dir); delErr != nil { - s.logger.Printf("[ERR] snapshot: Failed to delete temporary snapshot directory at path %v: %v", s.dir, delErr) + s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr) return delErr } return err @@ -396,27 +403,27 @@ func (s *FileSnapshotSink) Close() error { // Write out the meta data if err := s.writeMeta(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) + s.logger.Error("failed to write metadata", "error", err) return err } // Move the directory into place newPath := strings.TrimSuffix(s.dir, tmpSuffix) if err := os.Rename(s.dir, newPath); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to move snapshot into place: %v", err) + s.logger.Error("failed to move snapshot into place", "error", err) return err } - if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems + if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems parentFH, err := os.Open(s.parentDir) defer parentFH.Close() if err != nil { - s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) + s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err) return err } if err = parentFH.Sync(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) + s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err) return err } } @@ -439,7 +446,7 @@ func (s *FileSnapshotSink) Cancel() error { // Close the open handles if err := s.finalize(); err != nil { - s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) + s.logger.Error("failed to finalize snapshot", "error", err) return err } @@ -480,9 +487,11 @@ func (s *FileSnapshotSink) finalize() error { // writeMeta is used to write out the metadata we have. func (s *FileSnapshotSink) writeMeta() error { + var err error // Open the meta file metaPath := filepath.Join(s.dir, metaFilePath) - fh, err := os.Create(metaPath) + var fh *os.File + fh, err = os.Create(metaPath) if err != nil { return err } @@ -493,7 +502,7 @@ func (s *FileSnapshotSink) writeMeta() error { // Write out as JSON enc := json.NewEncoder(buffered) - if err := enc.Encode(&s.meta); err != nil { + if err = enc.Encode(&s.meta); err != nil { return err } diff --git a/vendor/github.com/hashicorp/raft/fsm.go b/vendor/github.com/hashicorp/raft/fsm.go index 8ad9b5995a..5622ebf828 100644 --- a/vendor/github.com/hashicorp/raft/fsm.go +++ b/vendor/github.com/hashicorp/raft/fsm.go @@ -31,6 +31,26 @@ type FSM interface { Restore(io.ReadCloser) error } +// BatchingFSM extends the FSM interface to add an ApplyBatch function. This can +// optionally be implemented by clients to enable multiple logs to be applied to +// the FSM in batches. Up to MaxAppendEntries could be sent in a batch. +type BatchingFSM interface { + // ApplyBatch is invoked once a batch of log entries has been committed and + // are ready to be applied to the FSM. ApplyBatch will take in an array of + // log entries. These log entries will be in the order they were committed, + // will not have gaps, and could be of a few log types. Clients should check + // the log type prior to attempting to decode the data attached. Presently + // the LogCommand and LogConfiguration types will be sent. + // + // The returned slice must be the same length as the input and each response + // should correlate to the log at the same index of the input. The returned + // values will be made available in the ApplyFuture returned by Raft.Apply + // method if that method was called on the same Raft node as the FSM. + ApplyBatch([]*Log) []interface{} + + FSM +} + // FSMSnapshot is returned by an FSM in response to a Snapshot // It must be safe to invoke FSMSnapshot methods with concurrent // calls to Apply. @@ -49,7 +69,10 @@ type FSMSnapshot interface { func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 - commit := func(req *commitTuple) { + batchingFSM, batchingEnabled := r.fsm.(BatchingFSM) + configStore, configStoreEnabled := r.fsm.(ConfigurationStore) + + commitSingle := func(req *commitTuple) { // Apply the log if a command or config change var resp interface{} // Make sure we send a response @@ -68,15 +91,14 @@ func (r *Raft) runFSM() { metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) case LogConfiguration: - configStore, ok := r.fsm.(ConfigurationStore) - if !ok { + if !configStoreEnabled { // Return early to avoid incrementing the index and term for // an unimplemented operation. return } start := time.Now() - configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data)) + configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data)) metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start) } @@ -85,6 +107,67 @@ func (r *Raft) runFSM() { lastTerm = req.log.Term } + commitBatch := func(reqs []*commitTuple) { + if !batchingEnabled { + for _, ct := range reqs { + commitSingle(ct) + } + return + } + + // Only send LogCommand and LogConfiguration log types. LogBarrier types + // will not be sent to the FSM. + shouldSend := func(l *Log) bool { + switch l.Type { + case LogCommand, LogConfiguration: + return true + } + return false + } + + var lastBatchIndex, lastBatchTerm uint64 + sendLogs := make([]*Log, 0, len(reqs)) + for _, req := range reqs { + if shouldSend(req.log) { + sendLogs = append(sendLogs, req.log) + } + lastBatchIndex = req.log.Index + lastBatchTerm = req.log.Term + } + + var responses []interface{} + if len(sendLogs) > 0 { + start := time.Now() + responses = batchingFSM.ApplyBatch(sendLogs) + metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start) + metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs))) + + // Ensure we get the expected responses + if len(sendLogs) != len(responses) { + panic("invalid number of responses") + } + } + + // Update the indexes + lastIndex = lastBatchIndex + lastTerm = lastBatchTerm + + var i int + for _, req := range reqs { + var resp interface{} + // If the log was sent to the FSM, retrieve the response. + if shouldSend(req.log) { + resp = responses[i] + i++ + } + + if req.future != nil { + req.future.response = resp + req.future.respond(nil) + } + } + } + restore := func(req *restoreFuture) { // Open the snapshot meta, source, err := r.snapshots.Open(req.ID) @@ -132,8 +215,8 @@ func (r *Raft) runFSM() { select { case ptr := <-r.fsmMutateCh: switch req := ptr.(type) { - case *commitTuple: - commit(req) + case []*commitTuple: + commitBatch(req) case *restoreFuture: restore(req) diff --git a/vendor/github.com/hashicorp/raft/future.go b/vendor/github.com/hashicorp/raft/future.go index cc1e905ef0..6346b453b1 100644 --- a/vendor/github.com/hashicorp/raft/future.go +++ b/vendor/github.com/hashicorp/raft/future.go @@ -183,14 +183,13 @@ type userSnapshotFuture struct { func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { if u.opener == nil { return nil, nil, fmt.Errorf("no snapshot available") - } else { - // Invalidate the opener so it can't get called multiple times, - // which isn't generally safe. - defer func() { - u.opener = nil - }() - return u.opener() } + // Invalidate the opener so it can't get called multiple times, + // which isn't generally safe. + defer func() { + u.opener = nil + }() + return u.opener() } // userRestoreFuture is used for waiting on a user-triggered restore of an diff --git a/vendor/github.com/hashicorp/raft/inmem_snapshot.go b/vendor/github.com/hashicorp/raft/inmem_snapshot.go index ad52f93aef..641d9d8172 100644 --- a/vendor/github.com/hashicorp/raft/inmem_snapshot.go +++ b/vendor/github.com/hashicorp/raft/inmem_snapshot.go @@ -100,10 +100,12 @@ func (s *InmemSnapshotSink) Close() error { return nil } +// ID returns the ID of the SnapshotMeta func (s *InmemSnapshotSink) ID() string { return s.meta.ID } +// Cancel returns successfully with a nil error func (s *InmemSnapshotSink) Cancel() error { return nil } diff --git a/vendor/github.com/hashicorp/raft/inmem_transport.go b/vendor/github.com/hashicorp/raft/inmem_transport.go index 7f493f4871..6196dfa404 100644 --- a/vendor/github.com/hashicorp/raft/inmem_transport.go +++ b/vendor/github.com/hashicorp/raft/inmem_transport.go @@ -24,7 +24,7 @@ type inmemPipeline struct { shutdown bool shutdownCh chan struct{} - shutdownLock sync.Mutex + shutdownLock sync.RWMutex } type inmemPipelineInflight struct { @@ -314,6 +314,17 @@ func (i *inmemPipeline) AppendEntries(args *AppendEntriesRequest, resp *AppendEn Command: args, RespChan: respCh, } + + // Check if we have been already shutdown, otherwise the random choose + // made by select statement below might pick consumerCh even if + // shutdownCh was closed. + i.shutdownLock.RLock() + shutdown := i.shutdown + i.shutdownLock.RUnlock() + if shutdown { + return nil, ErrPipelineShutdown + } + select { case i.peer.consumerCh <- rpc: case <-timeout: diff --git a/vendor/github.com/hashicorp/raft/log.go b/vendor/github.com/hashicorp/raft/log.go index c15a84472f..ad3bf0f09d 100644 --- a/vendor/github.com/hashicorp/raft/log.go +++ b/vendor/github.com/hashicorp/raft/log.go @@ -10,12 +10,12 @@ const ( // LogNoop is used to assert leadership. LogNoop - // LogAddPeer is used to add a new peer. This should only be used with + // LogAddPeerDeprecated is used to add a new peer. This should only be used with // older protocol versions designed to be compatible with unversioned // Raft servers. See comments in config.go for details. LogAddPeerDeprecated - // LogRemovePeer is used to remove an existing peer. This should only be + // LogRemovePeerDeprecated is used to remove an existing peer. This should only be // used with older protocol versions designed to be compatible with // unversioned Raft servers. See comments in config.go for details. LogRemovePeerDeprecated diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index 523fa698e5..6092cafbcf 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -5,8 +5,8 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-hclog" "io" - "log" "net" "os" "sync" @@ -66,7 +66,7 @@ type NetworkTransport struct { heartbeatFn func(RPC) heartbeatFnLock sync.Mutex - logger *log.Logger + logger hclog.Logger maxPool int @@ -92,7 +92,7 @@ type NetworkTransportConfig struct { // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC ServerAddressProvider ServerAddressProvider - Logger *log.Logger + Logger hclog.Logger // Dialer Stream StreamLayer @@ -105,6 +105,7 @@ type NetworkTransportConfig struct { Timeout time.Duration } +// ServerAddressProvider is a target address to which we invoke an RPC when establishing a connection type ServerAddressProvider interface { ServerAddr(id ServerID) (ServerAddress, error) } @@ -148,7 +149,11 @@ func NewNetworkTransportWithConfig( config *NetworkTransportConfig, ) *NetworkTransport { if config.Logger == nil { - config.Logger = log.New(os.Stderr, "", log.LstdFlags) + config.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + }) } trans := &NetworkTransport{ connPool: make(map[ServerAddress][]*netConn), @@ -182,7 +187,11 @@ func NewNetworkTransport( if logOutput == nil { logOutput = os.Stderr } - logger := log.New(logOutput, "", log.LstdFlags) + logger := hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: logOutput, + Level: hclog.DefaultLevel, + }) config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} return NewNetworkTransportWithConfig(config) } @@ -195,7 +204,7 @@ func NewNetworkTransportWithLogger( stream StreamLayer, maxPool int, timeout time.Duration, - logger *log.Logger, + logger hclog.Logger, ) *NetworkTransport { config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} return NewNetworkTransportWithConfig(config) @@ -310,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv if n.serverAddressProvider != nil { serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) if err != nil { - n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err) + n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err) } else { return serverAddressOverride } @@ -486,7 +495,7 @@ func (n *NetworkTransport) listen() { } if !n.IsShutdown() { - n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) + n.logger.Error("failed to accept connection", "error", err) } select { @@ -499,7 +508,7 @@ func (n *NetworkTransport) listen() { // No error, reset loop delay loopDelay = 0 - n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) + n.logger.Debug("accepted connection", "local-address", n.LocalAddr(), "remote-address", conn.RemoteAddr().String()) // Handle the connection in dedicated routine go n.handleConn(n.getStreamContext(), conn) @@ -519,19 +528,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { for { select { case <-connCtx.Done(): - n.logger.Println("[DEBUG] raft-net: stream layer is closed") + n.logger.Debug("stream layer is closed") return default: } if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { - n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) + n.logger.Error("failed to decode incoming command", "error", err) } return } if err := w.Flush(); err != nil { - n.logger.Printf("[ERR] raft-net: Failed to flush response: %v", err) + n.logger.Error("failed to flush response", "error", err) return } } diff --git a/vendor/github.com/hashicorp/raft/observer.go b/vendor/github.com/hashicorp/raft/observer.go index 2d4f37db12..1611d6b44b 100644 --- a/vendor/github.com/hashicorp/raft/observer.go +++ b/vendor/github.com/hashicorp/raft/observer.go @@ -18,7 +18,7 @@ type Observation struct { // LeaderObservation is used for the data when leadership changes. type LeaderObservation struct { - leader ServerAddress + Leader ServerAddress } // PeerObservation is sent to observers when peers change. diff --git a/vendor/github.com/hashicorp/raft/raft.go b/vendor/github.com/hashicorp/raft/raft.go index e1f6a760f7..fb8f4b308e 100644 --- a/vendor/github.com/hashicorp/raft/raft.go +++ b/vendor/github.com/hashicorp/raft/raft.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/go-hclog" + "github.com/armon/go-metrics" ) @@ -94,7 +96,7 @@ func (r *Raft) setLeader(leader ServerAddress) { r.leader = leader r.leaderLock.Unlock() if oldLeader != leader { - r.observe(LeaderObservation{leader: leader}) + r.observe(LeaderObservation{Leader: leader}) } } @@ -147,7 +149,7 @@ func (r *Raft) run() { // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false - r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader())) + r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) @@ -209,7 +211,7 @@ func (r *Raft) runFollower() { didWarn = true } } else { - r.logger.Warn(fmt.Sprintf("Heartbeat timeout from %q reached, starting election", lastLeader)) + r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader) metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) r.setState(Candidate) return @@ -245,7 +247,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error { // runCandidate runs the FSM for a candidate. func (r *Raft) runCandidate() { - r.logger.Info(fmt.Sprintf("%v entering Candidate state in term %v", r, r.getCurrentTerm()+1)) + r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1) metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) // Start vote for us, and set a timeout @@ -263,7 +265,7 @@ func (r *Raft) runCandidate() { // Tally the votes, need a simple majority grantedVotes := 0 votesNeeded := r.quorumSize() - r.logger.Debug(fmt.Sprintf("Votes needed: %d", votesNeeded)) + r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { select { @@ -273,7 +275,7 @@ func (r *Raft) runCandidate() { case vote := <-voteCh: // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { - r.logger.Debug("Newer term discovered, fallback to follower") + r.logger.Debug("newer term discovered, fallback to follower") r.setState(Follower) r.setCurrentTerm(vote.Term) return @@ -282,13 +284,12 @@ func (r *Raft) runCandidate() { // Check if the vote is granted if vote.Granted { grantedVotes++ - r.logger.Debug(fmt.Sprintf("Vote granted from %s in term %v. Tally: %d", - vote.voterID, vote.Term, grantedVotes)) + r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes) } // Check if we've become the leader if grantedVotes >= votesNeeded { - r.logger.Info(fmt.Sprintf("Election won. Tally: %d", grantedVotes)) + r.logger.Info("election won", "tally", grantedVotes) r.setState(Leader) r.setLeader(r.localAddr) return @@ -359,7 +360,7 @@ func (r *Raft) setupLeaderState() { // runLeader runs the FSM for a leader. Do the setup here and drop into // the leaderLoop for the hot loop. func (r *Raft) runLeader() { - r.logger.Info(fmt.Sprintf("%v entering Leader state", r)) + r.logger.Info("entering leader state", "leader", r) metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) // Notify that we are the leader @@ -470,7 +471,7 @@ func (r *Raft) startStopReplication() { } inConfig[server.ID] = true if _, ok := r.leaderState.replState[server.ID]; !ok { - r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID)) + r.logger.Info("added peer, starting replication", "peer", server.ID) s := &followerReplication{ peer: server, commitment: r.leaderState.commitment, @@ -497,7 +498,7 @@ func (r *Raft) startStopReplication() { continue } // Replicate up to lastIdx and stop - r.logger.Info(fmt.Sprintf("Removed peer %v, stopping replication after %v", serverID, lastIdx)) + r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx) repl.stopCh <- lastIdx close(repl.stopCh) delete(r.leaderState.replState, serverID) @@ -618,49 +619,58 @@ func (r *Raft) leaderLoop() { commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) + // New configration has been committed, set it as the committed + // value. if r.configurations.latestIndex > oldCommitIndex && r.configurations.latestIndex <= commitIndex { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) if !hasVote(r.configurations.committed, r.localID) { stepDown = true } } - var numProcessed int start := time.Now() + var groupReady []*list.Element + var groupFutures = make(map[uint64]*logFuture) + var lastIdxInGroup uint64 - for { - e := r.leaderState.inflight.Front() - if e == nil { - break - } + // Pull all inflight logs that are committed off the queue. + for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { commitLog := e.Value.(*logFuture) idx := commitLog.log.Index if idx > commitIndex { + // Don't go past the committed index break } + // Measure the commit time metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) + groupReady = append(groupReady, e) + groupFutures[idx] = commitLog + lastIdxInGroup = idx + } - r.processLogs(idx, commitLog) + // Process the group + if len(groupReady) != 0 { + r.processLogs(lastIdxInGroup, groupFutures) - r.leaderState.inflight.Remove(e) - numProcessed++ + for _, e := range groupReady { + r.leaderState.inflight.Remove(e) + } } // Measure the time to enqueue batch of logs for FSM to apply metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start) // Count the number of logs enqueued - metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed)) + metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady))) if stepDown { if r.conf.ShutdownOnRemove { - r.logger.Info("Removed ourself, shutting down") + r.logger.Info("removed ourself, shutting down") r.Shutdown() } else { - r.logger.Info("Removed ourself, transitioning to follower") + r.logger.Info("removed ourself, transitioning to follower") r.setState(Follower) } } @@ -672,7 +682,7 @@ func (r *Raft) leaderLoop() { } else if v.votes < v.quorumSize { // Early return, means there must be a new leader - r.logger.Warn("New leader elected, stepping down") + r.logger.Warn("new leader elected, stepping down") r.setState(Follower) delete(r.leaderState.notify, v) for _, repl := range r.leaderState.replState { @@ -867,9 +877,9 @@ func (r *Raft) checkLeaderLease() time.Duration { } else { // Log at least once at high value, then debug. Otherwise it gets very verbose. if diff <= 3*r.conf.LeaderLeaseTimeout { - r.logger.Warn(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) + r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff) } else { - r.logger.Debug(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) + r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff) } } metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond)) @@ -879,7 +889,7 @@ func (r *Raft) checkLeaderLease() time.Duration { // Verify we can contact a quorum quorum := r.quorumSize() if contacted < quorum { - r.logger.Warn("Failed to contact quorum of nodes, stepping down") + r.logger.Warn("failed to contact quorum of nodes, stepping down") r.setState(Follower) metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1) } @@ -967,7 +977,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { if err := sink.Close(); err != nil { return fmt.Errorf("failed to close snapshot: %v", err) } - r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) + r.logger.Info("copied to local snapshot", "bytes", n) // Restore the snapshot into the FSM. If this fails we are in a // bad state so we panic to take ourselves out. @@ -991,7 +1001,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { r.setLastApplied(lastIndex) r.setLastSnapshot(lastIndex, term) - r.logger.Info(fmt.Sprintf("Restored user snapshot (index %d)", lastIndex)) + r.logger.Info("restored user snapshot", "index", latestIndex) return nil } @@ -1005,8 +1015,11 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { return } - r.logger.Info(fmt.Sprintf("Updating configuration with %s (%v, %v) to %+v", - future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers)) + r.logger.Info("updating configuration", + "command", future.req.command, + "server-id", future.req.serverID, + "server-addr", future.req.serverAddress, + "servers", hclog.Fmt("%+v", configuration.Servers)) // In pre-ID compatibility mode we translate all configuration changes // in to an old remove peer message, which can handle all supported @@ -1023,14 +1036,13 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { } else { future.log = Log{ Type: LogConfiguration, - Data: encodeConfiguration(configuration), + Data: EncodeConfiguration(configuration), } } r.dispatchLogs([]*logFuture{&future.logFuture}) index := future.Index() - r.configurations.latest = configuration - r.configurations.latestIndex = index + r.setLatestConfiguration(configuration, index) r.leaderState.commitment.setConfiguration(configuration) r.startStopReplication() } @@ -1059,7 +1071,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { - r.logger.Error(fmt.Sprintf("Failed to commit logs: %v", err)) + r.logger.Error("failed to commit logs", "error", err) for _, applyLog := range applyLogs { applyLog.respond(err) } @@ -1081,72 +1093,88 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { // applied up to the given index limit. // This can be called from both leaders and followers. // Followers call this from AppendEntries, for n entries at a time, and always -// pass future=nil. -// Leaders call this once per inflight when entries are committed. They pass -// the future from inflights. -func (r *Raft) processLogs(index uint64, future *logFuture) { +// pass futures=nil. +// Leaders call this when entries are committed. They pass the futures from any +// inflight logs. +func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { // Reject logs we've applied already lastApplied := r.getLastApplied() if index <= lastApplied { - r.logger.Warn(fmt.Sprintf("Skipping application of old log: %d", index)) + r.logger.Warn("skipping application of old log", "index", index) return } + applyBatch := func(batch []*commitTuple) { + select { + case r.fsmMutateCh <- batch: + case <-r.shutdownCh: + for _, cl := range batch { + if cl.future != nil { + cl.future.respond(ErrRaftShutdown) + } + } + } + } + + batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries) + // Apply all the preceding logs - for idx := r.getLastApplied() + 1; idx <= index; idx++ { + for idx := lastApplied + 1; idx <= index; idx++ { + var preparedLog *commitTuple // Get the log, either from the future or from our log store - if future != nil && future.log.Index == idx { - r.processLog(&future.log, future) + future, futureOk := futures[idx] + if futureOk { + preparedLog = r.prepareLog(&future.log, future) } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", idx, err)) + r.logger.Error("failed to get log", "index", idx, "error", err) panic(err) } - r.processLog(l, nil) + preparedLog = r.prepareLog(l, nil) } - // Update the lastApplied index and term - r.setLastApplied(idx) + switch { + case preparedLog != nil: + // If we have a log ready to send to the FSM add it to the batch. + // The FSM thread will respond to the future. + batch = append(batch, preparedLog) + + // If we have filled up a batch, send it to the FSM + if len(batch) >= r.conf.MaxAppendEntries { + applyBatch(batch) + batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries) + } + + case futureOk: + // Invoke the future if given. + future.respond(nil) + } } + + // If there are any remaining logs in the batch apply them + if len(batch) != 0 { + applyBatch(batch) + } + + // Update the lastApplied index and term + r.setLastApplied(index) } // processLog is invoked to process the application of a single committed log entry. -func (r *Raft) processLog(l *Log, future *logFuture) { +func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple { switch l.Type { case LogBarrier: // Barrier is handled by the FSM fallthrough case LogCommand: - // Forward to the fsm handler - select { - case r.fsmMutateCh <- &commitTuple{l, future}: - case <-r.shutdownCh: - if future != nil { - future.respond(ErrRaftShutdown) - } - } - - // Return so that the future is only responded to - // by the FSM handler when the application is done - return + return &commitTuple{l, future} case LogConfiguration: // Only support this with the v2 configuration format if r.protocolVersion > 2 { - // Forward to the fsm handler - select { - case r.fsmMutateCh <- &commitTuple{l, future}: - case <-r.shutdownCh: - if future != nil { - future.respond(ErrRaftShutdown) - } - } - - // Return so that the future is only responded to - // by the FSM handler when the application is done - return + return &commitTuple{l, future} } case LogAddPeerDeprecated: case LogRemovePeerDeprecated: @@ -1157,10 +1185,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) { panic(fmt.Errorf("unrecognized log type: %#v", l)) } - // Invoke the future if given - if future != nil { - future.respond(nil) - } + return nil } // processRPC is called to handle an incoming RPC request. This must only be @@ -1181,7 +1206,8 @@ func (r *Raft) processRPC(rpc RPC) { case *TimeoutNowRequest: r.timeoutNow(rpc, cmd) default: - r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command)) + r.logger.Error("got unexpected command", + "command", hclog.Fmt("%#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1204,7 +1230,7 @@ func (r *Raft) processHeartbeat(rpc RPC) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) default: - r.logger.Error(fmt.Sprintf("Expected heartbeat, got command: %#v", rpc.Command)) + r.logger.Error("expected heartbeat, got", "command", hclog.Fmt("%#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } @@ -1254,8 +1280,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } else { var prevLog Log if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { - r.logger.Warn(fmt.Sprintf("Failed to get previous log: %d %v (last: %d)", - a.PrevLogEntry, err, lastIdx)) + r.logger.Warn("failed to get previous log", + "previous-index", a.PrevLogEntry, + "last-index", lastIdx, + "error", err) resp.NoRetryBackoff = true return } @@ -1263,8 +1291,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } if a.PrevLogTerm != prevLogTerm { - r.logger.Warn(fmt.Sprintf("Previous log term mis-match: ours: %d remote: %d", - prevLogTerm, a.PrevLogTerm)) + r.logger.Warn("previous log term mis-match", + "ours", prevLogTerm, + "remote", a.PrevLogTerm) resp.NoRetryBackoff = true return } @@ -1284,19 +1313,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } var storeEntry Log if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { - r.logger.Warn(fmt.Sprintf("Failed to get log entry %d: %v", - entry.Index, err)) + r.logger.Warn("failed to get log entry", + "index", entry.Index, + "error", err) return } if entry.Term != storeEntry.Term { - r.logger.Warn(fmt.Sprintf("Clearing log suffix from %d to %d", entry.Index, lastLogIdx)) + r.logger.Warn("clearing log suffix", + "from", entry.Index, + "to", lastLogIdx) if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { - r.logger.Error(fmt.Sprintf("Failed to clear log suffix: %v", err)) + r.logger.Error("failed to clear log suffix", "error", err) return } if entry.Index <= r.configurations.latestIndex { - r.configurations.latest = r.configurations.committed - r.configurations.latestIndex = r.configurations.committedIndex + r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex) } newEntries = a.Entries[i:] break @@ -1306,7 +1337,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { if n := len(newEntries); n > 0 { // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { - r.logger.Error(fmt.Sprintf("Failed to append to logs: %v", err)) + r.logger.Error("failed to append to logs", "error", err) // TODO: leaving r.getLastLog() in the wrong // state if there was a truncation above return @@ -1331,8 +1362,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { idx := min(a.LeaderCommitIndex, r.getLastIndex()) r.setCommitIndex(idx) if r.configurations.latestIndex <= idx { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) } r.processLogs(idx, nil) metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start) @@ -1349,15 +1379,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { // called from the main thread, or from NewRaft() before any threads have begun. func (r *Raft) processConfigurationLogEntry(entry *Log) { if entry.Type == LogConfiguration { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = decodeConfiguration(entry.Data) - r.configurations.latestIndex = entry.Index + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) + r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index) } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { - r.configurations.committed = r.configurations.latest - r.configurations.committedIndex = r.configurations.latestIndex - r.configurations.latest = decodePeers(entry.Data, r.trans) - r.configurations.latestIndex = entry.Index + r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex) + r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index) } } @@ -1389,8 +1415,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // vote! candidate := r.trans.DecodePeer(req.Candidate) if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v", - candidate, leader)) + r.logger.Warn("rejecting vote request since we have a leader", + "from", candidate, + "leader", leader) return } @@ -1402,7 +1429,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Increase the term if we see a newer one if req.Term > r.getCurrentTerm() { // Ensure transition to follower - r.logger.Debug("lost leadership because received a requestvote with newer term") + r.logger.Debug("lost leadership because received a requestVote with a newer term") r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term @@ -1411,20 +1438,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Check if we have voted yet lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) if err != nil && err.Error() != "not found" { - r.logger.Error(fmt.Sprintf("Failed to get last vote term: %v", err)) + r.logger.Error("failed to get last vote term", "error", err) return } lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) if err != nil && err.Error() != "not found" { - r.logger.Error(fmt.Sprintf("Failed to get last vote candidate: %v", err)) + r.logger.Error("failed to get last vote candidate", "error", err) return } // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { - r.logger.Info(fmt.Sprintf("Duplicate RequestVote for same term: %d", req.Term)) + r.logger.Info("duplicate requestVote for same term", "term", req.Term) if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { - r.logger.Warn(fmt.Sprintf("Duplicate RequestVote from candidate: %s", req.Candidate)) + r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate) resp.Granted = true } return @@ -1433,20 +1460,24 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Reject if their term is older lastIdx, lastTerm := r.getLastEntry() if lastTerm > req.LastLogTerm { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last term is greater (%d, %d)", - candidate, lastTerm, req.LastLogTerm)) + r.logger.Warn("rejecting vote request since our last term is greater", + "candidate", candidate, + "last-term", lastTerm, + "last-candidate-term", req.LastLogTerm) return } if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { - r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last index is greater (%d, %d)", - candidate, lastIdx, req.LastLogIndex)) + r.logger.Warn("rejecting vote request since our last index is greater", + "candidate", candidate, + "last-index", lastIdx, + "last-candidate-index", req.LastLogIndex) return } // Persist a vote for safety if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Error(fmt.Sprintf("Failed to persist vote: %v", err)) + r.logger.Error("failed to persist vote", "error", err) return } @@ -1481,8 +1512,9 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Ignore an older term if req.Term < r.getCurrentTerm() { - r.logger.Info(fmt.Sprintf("Ignoring installSnapshot request with older term of %d vs currentTerm %d", - req.Term, r.getCurrentTerm())) + r.logger.Info("ignoring installSnapshot request with older term than current term", + "request-term", req.Term, + "current-term", r.getCurrentTerm()) return } @@ -1501,7 +1533,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { var reqConfiguration Configuration var reqConfigurationIndex uint64 if req.SnapshotVersion > 0 { - reqConfiguration = decodeConfiguration(req.Configuration) + reqConfiguration = DecodeConfiguration(req.Configuration) reqConfigurationIndex = req.ConfigurationIndex } else { reqConfiguration = decodePeers(req.Peers, r.trans) @@ -1511,7 +1543,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm, reqConfiguration, reqConfigurationIndex, r.trans) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to create snapshot to install: %v", err)) + r.logger.Error("failed to create snapshot to install", "error", err) rpcErr = fmt.Errorf("failed to create snapshot: %v", err) return } @@ -1520,7 +1552,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { n, err := io.Copy(sink, rpc.Reader) if err != nil { sink.Cancel() - r.logger.Error(fmt.Sprintf("Failed to copy snapshot: %v", err)) + r.logger.Error("failed to copy snapshot", "error", err) rpcErr = err return } @@ -1528,18 +1560,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Check that we received it all if n != req.Size { sink.Cancel() - r.logger.Error(fmt.Sprintf("Failed to receive whole snapshot: %d / %d", n, req.Size)) + r.logger.Error("failed to receive whole snapshot", + "received", hclog.Fmt("%d / %d", n, req.Size)) rpcErr = fmt.Errorf("short read") return } // Finalize the snapshot if err := sink.Close(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to finalize snapshot: %v", err)) + r.logger.Error("failed to finalize snapshot", "error", err) rpcErr = err return } - r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) + r.logger.Info("copied to local snapshot", "bytes", n) // Restore snapshot future := &restoreFuture{ID: sink.ID()} @@ -1553,7 +1586,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { // Wait for the restore to happen if err := future.Error(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to restore snapshot: %v", err)) + r.logger.Error("failed to restore snapshot", "error", err) rpcErr = err return } @@ -1565,14 +1598,12 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm) // Restore the peer set - r.configurations.latest = reqConfiguration - r.configurations.latestIndex = reqConfigurationIndex - r.configurations.committed = reqConfiguration - r.configurations.committedIndex = reqConfigurationIndex + r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex) + r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex) // Compact logs, continue even if this fails if err := r.compactLogs(req.LastLogIndex); err != nil { - r.logger.Error(fmt.Sprintf("Failed to compact logs: %v", err)) + r.logger.Error("failed to compact logs", "error", err) } r.logger.Info("Installed remote snapshot") @@ -1622,7 +1653,9 @@ func (r *Raft) electSelf() <-chan *voteResult { resp := &voteResult{voterID: peer.ID} err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to make RequestVote RPC to %v: %v", peer, err)) + r.logger.Error("failed to make requestVote RPC", + "target", peer, + "error", err) resp.Term = req.Term resp.Granted = false } @@ -1636,7 +1669,7 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.ID == r.localID { // Persist a vote for ourselves if err := r.persistVote(req.Term, req.Candidate); err != nil { - r.logger.Error(fmt.Sprintf("Failed to persist vote : %v", err)) + r.logger.Error("failed to persist vote", "error", err) return nil } // Include our own vote @@ -1753,3 +1786,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) { r.candidateFromLeadershipTransfer = true rpc.Respond(&TimeoutNowResponse{}, nil) } + +// setLatestConfiguration stores the latest configuration and updates a copy of it. +func (r *Raft) setLatestConfiguration(c Configuration, i uint64) { + r.configurations.latest = c + r.configurations.latestIndex = i + r.latestConfiguration.Store(c.Clone()) +} + +// setCommittedConfiguration stores the committed configuration. +func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) { + r.configurations.committed = c + r.configurations.committedIndex = i +} + +// getLatestConfiguration reads the configuration from a copy of the main +// configuration, which means it can be accessed independently from the main +// loop. +func (r *Raft) getLatestConfiguration() Configuration { + // this switch catches the case where this is called without having set + // a configuration previously. + switch c := r.latestConfiguration.Load().(type) { + case Configuration: + return c + default: + return Configuration{} + } +} diff --git a/vendor/github.com/hashicorp/raft/replication.go b/vendor/github.com/hashicorp/raft/replication.go index 900afc4c24..1e2f2db701 100644 --- a/vendor/github.com/hashicorp/raft/replication.go +++ b/vendor/github.com/hashicorp/raft/replication.go @@ -100,7 +100,7 @@ func (s *followerReplication) notifyAll(leader bool) { s.notifyLock.Unlock() // Submit our votes - for v, _ := range n { + for v := range n { v.vote(leader) } } @@ -182,7 +182,7 @@ PIPELINE: // to standard mode on failure. if err := r.pipelineReplicate(s); err != nil { if err != ErrPipelineReplicationNotSupported { - r.logger.Error(fmt.Sprintf("Failed to start pipeline replication to %s: %s", s.peer, err)) + r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err) } } goto RPC @@ -215,7 +215,7 @@ START: // Make the RPC call start = time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Error(fmt.Sprintf("Failed to AppendEntries to %v: %v", s.peer, err)) + r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err) s.failures++ return } @@ -245,7 +245,7 @@ START: } else { s.failures++ } - r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, atomic.LoadUint64(&s.nextIndex))) + r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex)) } CHECK_MORE: @@ -272,7 +272,7 @@ SEND_SNAP: if stop, err := r.sendLatestSnapshot(s); stop { return true } else if err != nil { - r.logger.Error(fmt.Sprintf("Failed to send snapshot to %v: %v", s.peer, err)) + r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err) return } @@ -286,7 +286,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Get the snapshots snapshots, err := r.snapshots.List() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) + r.logger.Error("failed to list snapshots", "error", err) return false, err } @@ -299,7 +299,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { snapID := snapshots[0].ID meta, snapshot, err := r.snapshots.Open(snapID) if err != nil { - r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapID, err)) + r.logger.Error("failed to open snapshot", "id", snapID, "error", err) return false, err } defer snapshot.Close() @@ -314,7 +314,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { LastLogTerm: meta.Term, Peers: meta.Peers, Size: meta.Size, - Configuration: encodeConfiguration(meta.Configuration), + Configuration: EncodeConfiguration(meta.Configuration), ConfigurationIndex: meta.ConfigurationIndex, } @@ -322,7 +322,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { start := time.Now() var resp InstallSnapshotResponse if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { - r.logger.Error(fmt.Sprintf("Failed to install snapshot %v: %v", snapID, err)) + r.logger.Error("failed to install snapshot", "id", snapID, "error", err) s.failures++ return false, err } @@ -350,7 +350,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { s.notifyAll(true) } else { s.failures++ - r.logger.Warn(fmt.Sprintf("InstallSnapshot to %v rejected", s.peer)) + r.logger.Warn("installSnapshot rejected to", "peer", s.peer) } return false, nil } @@ -377,7 +377,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { start := time.Now() if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { - r.logger.Error(fmt.Sprintf("Failed to heartbeat to %v: %v", s.peer.Address, err)) + r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err) failures++ select { case <-time.After(backoff(failureWait, failures, maxFailureScale)): @@ -405,8 +405,8 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error { defer pipeline.Close() // Log start and stop of pipeline - r.logger.Info(fmt.Sprintf("pipelining replication to peer %v", s.peer)) - defer r.logger.Info(fmt.Sprintf("aborting pipeline replication to peer %v", s.peer)) + r.logger.Info("pipelining replication", "peer", s.peer) + defer r.logger.Info("aborting pipeline replication", "peer", s.peer) // Create a shutdown and finish channel stopCh := make(chan struct{}) @@ -467,7 +467,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u // Pipeline the append entries if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil { - r.logger.Error(fmt.Sprintf("Failed to pipeline AppendEntries to %v: %v", s.peer, err)) + r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err) return true } @@ -543,7 +543,7 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error } else { var l Log if err := r.logs.GetLog(nextIndex-1, &l); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", nextIndex-1, err)) + r.logger.Error("failed to get log", "index", nextIndex-1, "error", err) return err } @@ -562,7 +562,7 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64 for i := nextIndex; i <= maxIndex; i++ { oldLog := new(Log) if err := r.logs.GetLog(i, oldLog); err != nil { - r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", i, err)) + r.logger.Error("failed to get log", "index", i, "error", err) return err } req.Entries = append(req.Entries, oldLog) @@ -578,7 +578,7 @@ func appendStats(peer string, start time.Time, logs float32) { // handleStaleTerm is used when a follower indicates that we have a stale term. func (r *Raft) handleStaleTerm(s *followerReplication) { - r.logger.Error(fmt.Sprintf("peer %v has newer term, stopping replication", s.peer)) + r.logger.Error("peer has newer term, stopping replication", "peer", s.peer) s.notifyAll(false) // No longer leader asyncNotifyCh(s.stepDown) } diff --git a/vendor/github.com/hashicorp/raft/snapshot.go b/vendor/github.com/hashicorp/raft/snapshot.go index 2e0f77a5dd..f4c3945145 100644 --- a/vendor/github.com/hashicorp/raft/snapshot.go +++ b/vendor/github.com/hashicorp/raft/snapshot.go @@ -77,14 +77,14 @@ func (r *Raft) runSnapshots() { // Trigger a snapshot if _, err := r.takeSnapshot(); err != nil { - r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) + r.logger.Error("failed to take snapshot", "error", err) } case future := <-r.userSnapshotCh: // User-triggered, run immediately id, err := r.takeSnapshot() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) + r.logger.Error("failed to take snapshot", "error", err) } else { future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { return r.snapshots.Open(id) @@ -107,7 +107,7 @@ func (r *Raft) shouldSnapshot() bool { // Check the last log index lastIdx, err := r.logs.LastIndex() if err != nil { - r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err)) + r.logger.Error("failed to get last log index", "error", err) return false } @@ -172,7 +172,7 @@ func (r *Raft) takeSnapshot() (string, error) { } // Create a new snapshot. - r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index)) + r.logger.Info("starting snapshot up to", "index", snapReq.index) start := time.Now() version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) @@ -202,7 +202,7 @@ func (r *Raft) takeSnapshot() (string, error) { return "", err } - r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index)) + r.logger.Info("snapshot complete up to", "index", snapReq.index) return sink.ID(), nil } @@ -228,8 +228,12 @@ func (r *Raft) compactLogs(snapIdx uint64) error { // after the snapshot to be removed. maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) - // Log this - r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog)) + if minLog > maxLog { + r.logger.Info("no logs to truncate") + return nil + } + + r.logger.Info("compacting logs", "from", minLog, "to", maxLog) // Compact the logs if err := r.logs.DeleteRange(minLog, maxLog); err != nil { diff --git a/vendor/github.com/hashicorp/raft/tcp_transport.go b/vendor/github.com/hashicorp/raft/tcp_transport.go index 69c928ed92..ff40a57bcd 100644 --- a/vendor/github.com/hashicorp/raft/tcp_transport.go +++ b/vendor/github.com/hashicorp/raft/tcp_transport.go @@ -2,8 +2,8 @@ package raft import ( "errors" + "github.com/hashicorp/go-hclog" "io" - "log" "net" "time" ) @@ -40,7 +40,7 @@ func NewTCPTransportWithLogger( advertise net.Addr, maxPool int, timeout time.Duration, - logger *log.Logger, + logger hclog.Logger, ) (*NetworkTransport, error) { return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) diff --git a/vendor/github.com/hashicorp/raft/testing.go b/vendor/github.com/hashicorp/raft/testing.go index 913070b069..70fd7b7955 100644 --- a/vendor/github.com/hashicorp/raft/testing.go +++ b/vendor/github.com/hashicorp/raft/testing.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "reflect" "sync" @@ -16,6 +15,10 @@ import ( "github.com/hashicorp/go-msgpack/codec" ) +var ( + userSnapshotErrorsOnNoData = true +) + // Return configurations optimized for in-memory func inmemConfig(t *testing.T) *Config { conf := DefaultConfig() @@ -151,12 +154,18 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) { return len(d), nil } -func newTestLogger(t *testing.T) *log.Logger { - return log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds) +func newTestLogger(t *testing.T) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Output: &testLoggerAdapter{t: t}, + Level: hclog.DefaultLevel, + }) } -func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger { - return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds) +func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger { + return hclog.New(&hclog.LoggerOptions{ + Output: &testLoggerAdapter{t: t, prefix: prefix}, + Level: hclog.DefaultLevel, + }) } func newTestLeveledLogger(t *testing.T) hclog.Logger { @@ -185,7 +194,7 @@ type cluster struct { conf *Config propagateTimeout time.Duration longstopTimeout time.Duration - logger *log.Logger + logger hclog.Logger startTime time.Time failedLock sync.Mutex @@ -222,7 +231,7 @@ func (c *cluster) notifyFailed() { // thread to block until all goroutines have completed in order to reliably // fail tests using this function. func (c *cluster) Failf(format string, args ...interface{}) { - c.logger.Printf(format, args...) + c.logger.Error(fmt.Sprintf(format, args...)) c.t.Fail() c.notifyFailed() } @@ -233,7 +242,7 @@ func (c *cluster) Failf(format string, args ...interface{}) { // other goroutines created during the test. Calling FailNowf does not stop // those other goroutines. func (c *cluster) FailNowf(format string, args ...interface{}) { - c.logger.Printf(format, args...) + c.logger.Error(fmt.Sprintf(format, args...)) c.t.FailNow() } @@ -254,7 +263,7 @@ func (c *cluster) Close() { for _, f := range futures { if err := f.Error(); err != nil { - c.FailNowf("[ERR] shutdown future err: %v", err) + c.FailNowf("shutdown future err: %v", err) } } @@ -316,7 +325,7 @@ CHECK: c.t.FailNow() case <-limitCh: - c.FailNowf("[ERR] Timeout waiting for replication") + c.FailNowf("timeout waiting for replication") case <-ch: for _, fsmRaw := range c.fsms { @@ -354,7 +363,7 @@ func (c *cluster) pollState(s RaftState) ([]*Raft, uint64) { // GetInState polls the state of the cluster and attempts to identify when it has // settled into the given state. func (c *cluster) GetInState(s RaftState) []*Raft { - c.logger.Printf("[INFO] Starting stability test for raft state: %+v", s) + c.logger.Info("starting stability test", "raft-state", s) limitCh := time.After(c.longstopTimeout) // An election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout) @@ -411,17 +420,18 @@ func (c *cluster) GetInState(s RaftState) []*Raft { c.t.FailNow() case <-limitCh: - c.FailNowf("[ERR] Timeout waiting for stable %s state", s) + c.FailNowf("timeout waiting for stable %s state", s) case <-c.WaitEventChan(filter, 0): - c.logger.Printf("[DEBUG] Resetting stability timeout") + c.logger.Debug("resetting stability timeout") case t, ok := <-timer.C: if !ok { - c.FailNowf("[ERR] Timer channel errored") + c.FailNowf("timer channel errored") } - c.logger.Printf("[INFO] Stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", - s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)) + + c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", + s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) return inState } } @@ -431,7 +441,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { func (c *cluster) Leader() *Raft { leaders := c.GetInState(Leader) if len(leaders) != 1 { - c.FailNowf("[ERR] expected one leader: %v", leaders) + c.FailNowf("expected one leader: %v", leaders) } return leaders[0] } @@ -442,14 +452,14 @@ func (c *cluster) Followers() []*Raft { expFollowers := len(c.rafts) - 1 followers := c.GetInState(Follower) if len(followers) != expFollowers { - c.FailNowf("[ERR] timeout waiting for %d followers (followers are %v)", expFollowers, followers) + c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers) } return followers } // FullyConnect connects all the transports together. func (c *cluster) FullyConnect() { - c.logger.Printf("[DEBUG] Fully Connecting") + c.logger.Debug("fully connecting") for i, t1 := range c.trans { for j, t2 := range c.trans { if i != j { @@ -462,7 +472,7 @@ func (c *cluster) FullyConnect() { // Disconnect disconnects all transports from the given address. func (c *cluster) Disconnect(a ServerAddress) { - c.logger.Printf("[DEBUG] Disconnecting %v", a) + c.logger.Debug("disconnecting", "address", a) for _, t := range c.trans { if t.LocalAddr() == a { t.DisconnectAll() @@ -475,7 +485,7 @@ func (c *cluster) Disconnect(a ServerAddress) { // Partition keeps the given list of addresses connected but isolates them // from the other members of the cluster. func (c *cluster) Partition(far []ServerAddress) { - c.logger.Printf("[DEBUG] Partitioning %v", far) + c.logger.Debug("partitioning", "addresses", far) // Gather the set of nodes on the "near" side of the partition (we // will call the supplied list of nodes the "far" side). @@ -500,7 +510,7 @@ OUTER: t.Disconnect(a) } } else { - for a, _ := range near { + for a := range near { t.Disconnect(a) } } @@ -530,15 +540,15 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) { leader = "[none]" } if expect == "" { - c.logger.Printf("[ERR] Peer %s sees leader %v expected [none]", r, leader) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]") } else { - c.logger.Printf("[ERR] Peer %s sees leader %v expected %v", r, leader, expect) + c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect) } fail = true } } if fail { - c.FailNowf("[ERR] At least one peer has the wrong notion of leader") + c.FailNowf("at least one peer has the wrong notion of leader") } } @@ -559,7 +569,7 @@ CHECK: if len(first.logs) != len(fsm.logs) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM log length mismatch: %d %d", + c.FailNowf("FSM log length mismatch: %d %d", len(first.logs), len(fsm.logs)) } else { goto WAIT @@ -570,7 +580,7 @@ CHECK: if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM log mismatch at index %d", idx) + c.FailNowf("FSM log mismatch at index %d", idx) } else { goto WAIT } @@ -579,7 +589,7 @@ CHECK: if len(first.configurations) != len(fsm.configurations) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM configuration length mismatch: %d %d", + c.FailNowf("FSM configuration length mismatch: %d %d", len(first.logs), len(fsm.logs)) } else { goto WAIT @@ -590,7 +600,7 @@ CHECK: if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) { fsm.Unlock() if time.Now().After(limit) { - c.FailNowf("[ERR] FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) + c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) } else { goto WAIT } @@ -613,7 +623,7 @@ WAIT: func (c *cluster) getConfiguration(r *Raft) Configuration { future := r.GetConfiguration() if err := future.Error(); err != nil { - c.FailNowf("[ERR] failed to get configuration: %v", err) + c.FailNowf("failed to get configuration: %v", err) return Configuration{} } @@ -634,7 +644,7 @@ CHECK: otherSet := c.getConfiguration(raft) if !reflect.DeepEqual(peerSet, otherSet) { if time.Now().After(limit) { - c.FailNowf("[ERR] peer mismatch: %+v %+v", peerSet, otherSet) + c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet) } else { goto WAIT } @@ -687,7 +697,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { for i := 0; i < opts.Peers; i++ { dir, err := ioutil.TempDir("", "raft") if err != nil { - c.FailNowf("[ERR] err: %v ", err) + c.FailNowf("err: %v", err) } store := NewInmemStore() @@ -742,18 +752,18 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { if opts.Bootstrap { err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) if err != nil { - c.FailNowf("[ERR] BootstrapCluster failed: %v", err) + c.FailNowf("BootstrapCluster failed: %v", err) } } raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) if err != nil { - c.FailNowf("[ERR] NewRaft failed: %v", err) + c.FailNowf("NewRaft failed: %v", err) } raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - c.FailNowf("[ERR] RegisterObserver failed: %v", err) + c.FailNowf("RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } diff --git a/vendor/github.com/hashicorp/raft/testing_batch.go b/vendor/github.com/hashicorp/raft/testing_batch.go new file mode 100644 index 0000000000..afb2285614 --- /dev/null +++ b/vendor/github.com/hashicorp/raft/testing_batch.go @@ -0,0 +1,29 @@ +// +build batchtest + +package raft + +func init() { + userSnapshotErrorsOnNoData = false +} + +// ApplyBatch enables MockFSM to satisfy the BatchingFSM interface. This +// function is gated by the batchtest build flag. +// +// NOTE: This is exposed for middleware testing purposes and is not a stable API +func (m *MockFSM) ApplyBatch(logs []*Log) []interface{} { + m.Lock() + defer m.Unlock() + + ret := make([]interface{}, len(logs)) + for i, log := range logs { + switch log.Type { + case LogCommand: + m.logs = append(m.logs, log.Data) + ret[i] = len(m.logs) + default: + ret[i] = nil + } + } + + return ret +} diff --git a/vendor/github.com/hashicorp/raft/util.go b/vendor/github.com/hashicorp/raft/util.go index 90428d7437..6fec84a3ba 100644 --- a/vendor/github.com/hashicorp/raft/util.go +++ b/vendor/github.com/hashicorp/raft/util.go @@ -13,7 +13,7 @@ import ( ) func init() { - // Ensure we use a high-entropy seed for the psuedo-random generator + // Ensure we use a high-entropy seed for the pseudo-random generator rand.Seed(newSeed()) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3e56c89a04..d00810355c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -245,7 +245,7 @@ github.com/hashicorp/mdns github.com/hashicorp/memberlist # github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/net-rpc-msgpackrpc -# github.com/hashicorp/raft v1.1.1 +# github.com/hashicorp/raft v1.1.2 github.com/hashicorp/raft # github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/raft-boltdb