Make Raft trailing logs and snapshot timing reloadable (#10129)

* WIP reloadable raft config

* Pre-define new raft gauges

* Update go-metrics to change gauge reset behaviour

* Update raft to pull in new metric and reloadable config

* Add snapshot persistance timing and installSnapshot to our 'protected' list as they can be infrequent but are important

* Update telemetry docs

* Update config and telemetry docs

* Add note to oldestLogAge on when it is visible

* Add changelog entry

* Update website/content/docs/agent/options.mdx

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>
This commit is contained in:
Paul Banks 2021-05-04 15:36:53 +01:00 committed by hc-github-team-consul-core
parent 60163a13ce
commit fa1b308c7b
32 changed files with 805 additions and 258 deletions

4
.changelog/10129.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:improvement
raft: allow reloading of raft trailing logs and snapshot timing to allow recovery from some [replication failure modes](https://github.com/hashicorp/consul/issues/9609).
telemetry: add metrics and documentation for [monitoring for replication issues](https://consul.io/docs/agent/telemetry#raft-replication-capacity-issues).
```

View File

@ -3643,6 +3643,9 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap,
RaftSnapshotThreshold: newCfg.RaftSnapshotThreshold,
RaftSnapshotInterval: newCfg.RaftSnapshotInterval,
RaftTrailingLogs: newCfg.RaftTrailingLogs,
}
if err := a.delegate.ReloadConfig(cc); err != nil {
return err

View File

@ -663,4 +663,7 @@ type ReloadableConfig struct {
RPCMaxBurst int
RPCMaxConnsPerClient int
ConfigEntryBootstrap []structs.ConfigEntry
RaftSnapshotThreshold int
RaftSnapshotInterval time.Duration
RaftTrailingLogs int
}

View File

@ -1387,6 +1387,13 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
func (s *Server) ReloadConfig(config ReloadableConfig) error {
// Reload raft config first before updating any other state since it could
// error if the new config is invalid.
raftCfg := computeRaftReloadableConfig(config)
if err := s.raft.ReloadConfig(raftCfg); err != nil {
return err
}
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
@ -1401,6 +1408,33 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error {
return nil
}
// computeRaftReloadableConfig works out the correct reloadable config for raft.
// We reload raft even if nothing has changed since it's cheap and simpler than
// trying to work out if it's different from the current raft config. This
// function is separate to make it cheap to table test thoroughly without a full
// raft instance.
func computeRaftReloadableConfig(config ReloadableConfig) raft.ReloadableConfig {
// We use the raw defaults _not_ the current values so that you can reload
// back to a zero value having previously started Consul with a custom value
// for one of these fields.
defaultConf := DefaultConfig()
raftCfg := raft.ReloadableConfig{
TrailingLogs: defaultConf.RaftConfig.TrailingLogs,
SnapshotInterval: defaultConf.RaftConfig.SnapshotInterval,
SnapshotThreshold: defaultConf.RaftConfig.SnapshotThreshold,
}
if config.RaftSnapshotThreshold != 0 {
raftCfg.SnapshotThreshold = uint64(config.RaftSnapshotThreshold)
}
if config.RaftSnapshotInterval != 0 {
raftCfg.SnapshotInterval = config.RaftSnapshotInterval
}
if config.RaftTrailingLogs != 0 {
raftCfg.TrailingLogs = uint64(config.RaftTrailingLogs)
}
return raftCfg
}
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)

View File

@ -14,6 +14,7 @@ import (
"github.com/google/tcpproxy"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/ipaddr"
@ -1466,6 +1467,9 @@ func TestServer_ReloadConfig(t *testing.T) {
c.Build = "1.5.0"
c.RPCRateLimit = 500
c.RPCMaxBurst = 5000
// Set one raft param to be non-default in the initial config, others are
// default.
c.RaftConfig.TrailingLogs = 1234
})
defer os.RemoveAll(dir1)
defer s.Shutdown()
@ -1480,6 +1484,14 @@ func TestServer_ReloadConfig(t *testing.T) {
RPCRateLimit: 1000,
RPCMaxBurst: 10000,
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
// Reset the custom one to default be removing it from config file (it will
// be a zero value here).
RaftTrailingLogs: 0,
// Set a different Raft param to something custom now
RaftSnapshotThreshold: 4321,
// Leave other raft fields default
}
require.NoError(t, s.ReloadConfig(rc))
@ -1496,6 +1508,98 @@ func TestServer_ReloadConfig(t *testing.T) {
limiter = s.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
// Check raft config
defaults := DefaultConfig()
got := s.raft.ReloadableConfig()
require.Equal(t, uint64(4321), got.SnapshotThreshold,
"should have be reloaded to new value")
require.Equal(t, defaults.RaftConfig.SnapshotInterval, got.SnapshotInterval,
"should have remained the default interval")
require.Equal(t, defaults.RaftConfig.TrailingLogs, got.TrailingLogs,
"should have reloaded to default trailing_logs")
// Now check that update each of those raft fields separately works correctly
// too.
}
func TestServer_computeRaftReloadableConfig(t *testing.T) {
defaults := DefaultConfig().RaftConfig
cases := []struct {
name string
rc ReloadableConfig
want raft.ReloadableConfig
}{
{
// This case is the common path - reload is called with a ReloadableConfig
// populated from the RuntimeConfig which has zero values for the fields.
// On startup we selectively pick non-zero runtime config fields to
// override defaults so we need to do the same.
name: "Still defaults",
rc: ReloadableConfig{},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "Threshold set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "interval set",
rc: ReloadableConfig{
RaftSnapshotInterval: 13 * time.Minute,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "trailing logs set",
rc: ReloadableConfig{
RaftTrailingLogs: 78910,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: 78910,
},
},
{
name: "all set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
RaftSnapshotInterval: 13 * time.Minute,
RaftTrailingLogs: 78910,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: 78910,
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
got := computeRaftReloadableConfig(tc.rc)
require.Equal(t, tc.want, got)
})
}
}
func TestServer_RPC_RateLimit(t *testing.T) {

View File

@ -175,6 +175,19 @@ func registerWithGRPC(b grpcresolver.Builder) {
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
raftGauges := []prometheus.GaugeDefinition{
{
Name: []string{"raft", "fsm", "lastRestoreDuration"},
Help: "This measures how long the last FSM restore (from disk or leader) took.",
},
{
Name: []string{"raft", "leader", "oldestLogAge"},
Help: "This measures how old the oldest log in the leader's log store is.",
},
}
// Build slice of slices for all gauge definitions
var gauges = [][]prometheus.GaugeDefinition{
cache.Gauges,
@ -185,7 +198,9 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
usagemetrics.Gauges,
consul.ReplicationGauges,
Gauges,
raftGauges,
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var gaugeDefs []prometheus.GaugeDefinition
@ -252,6 +267,14 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
Name: []string{"raft", "leader", "lastContact"},
Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.",
},
{
Name: []string{"raft", "snapshot", "persist"},
Help: "Measures the time it takes raft to write a new snapshot to disk.",
},
{
Name: []string{"raft", "rpc", "installSnapshot"},
Help: "Measures the time it takes the raft leader to install a snapshot on a follower that is catching up after being down or has just joined the cluster.",
},
}
var summaries = [][]prometheus.SummaryDefinition{

4
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/Microsoft/go-winio v0.4.3 // indirect
github.com/NYTimes/gziphandler v1.0.1
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/go-metrics v0.3.6
github.com/armon/go-metrics v0.3.7
github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.25.41
github.com/coredns/coredns v1.1.2
@ -52,7 +52,7 @@ require (
github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/memberlist v0.2.3
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.2.0
github.com/hashicorp/raft v1.3.0
github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5

7
go.sum
View File

@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
github.com/armon/go-metrics v0.3.6 h1:x/tmtOF9cDBoXH7XoAGOz2qqm1DknFD1590XmD/DUJ8=
github.com/armon/go-metrics v0.3.6/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.7 h1:c/oCtWzYpboy6+6f6LjXRlyW7NwA2SWf+a9KMlHq/bM=
github.com/armon/go-metrics v0.3.7/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@ -279,8 +279,9 @@ github.com/hashicorp/memberlist v0.2.3/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.3.0 h1:Wox4J4R7J2FOJLtTa6hdk0VJfiNUSP32pYoYR738bkE=
github.com/hashicorp/raft v1.3.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=

View File

@ -5,7 +5,6 @@ package prometheus
import (
"fmt"
"log"
"math"
"regexp"
"strings"
"sync"
@ -31,13 +30,12 @@ type PrometheusOpts struct {
Expiration time.Duration
Registerer prometheus.Registerer
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be
// deleted when their expiry is reached.
// - Gauges and Summaries will be set to NaN when they expire.
// - Counters continue to Collect their last known value.
// Ex:
// PrometheusOpts{
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving
// their Name, Help, and ConstLabels to the PrometheusSink when it is created.
// Metrics declared in this way will be initialized at zero and will not be
// deleted or altered when their expiry is reached.
//
// Ex: PrometheusOpts{
// Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{
// {
@ -139,21 +137,24 @@ func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) {
// logic to clean up ephemeral metrics if their value haven't been set for a
// duration exceeding our allowed expiration time.
func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
p.collectAtTime(c, time.Now())
}
// collectAtTime allows internal testing of the expiry based logic here without
// mocking clocks or making tests timing sensitive.
func (p *PrometheusSink) collectAtTime(c chan<- prometheus.Metric, t time.Time) {
expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool {
if v == nil {
return true
}
g := v.(*gauge)
lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if expire && lastUpdate.Add(p.expiration).Before(t) {
if g.canDelete {
p.gauges.Delete(k)
return true
}
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
}
g.Collect(c)
return true
@ -164,13 +165,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
}
s := v.(*summary)
lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if expire && lastUpdate.Add(p.expiration).Before(t) {
if s.canDelete {
p.summaries.Delete(k)
return true
}
// We have observed nothing in this interval.
s.Observe(math.NaN())
}
s.Collect(c)
return true
@ -181,12 +180,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
}
count := v.(*counter)
lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if expire && lastUpdate.Add(p.expiration).Before(t) {
if count.canDelete {
p.counters.Delete(k)
return true
}
// Counters remain at their previous value when not observed, so we do not set it to NaN.
}
count.Collect(c)
return true

View File

@ -1,5 +1,21 @@
# UNRELEASED
# 1.3.0 (April 22nd, 2021)
IMPROVEMENTS
* Added metrics for `oldestLogAge` and `lastRestoreDuration` to monitor capacity issues that can cause unrecoverable cluster failure [[GH-452](https://github.com/hashicorp/raft/pull/452)][[GH-454](https://github.com/hashicorp/raft/pull/454/files)]
* Made `TrailingLogs`, `SnapshotInterval` and `SnapshotThreshold` reloadable at runtime using a new `ReloadConfig` method. This allows recovery from cases where there are not enough logs retained for followers to catchup after a restart. [[GH-444](https://github.com/hashicorp/raft/pull/444)]
* Inclusify the repository by switching to main [[GH-446](https://github.com/hashicorp/raft/pull/446)]
* Add option for a buffered `ApplyCh` if `MaxAppendEntries` is enabled [[GH-445](https://github.com/hashicorp/raft/pull/445)]
* Add string to `LogType` for more human readable debugging [[GH-442](https://github.com/hashicorp/raft/pull/442)]
* Extract fuzzy testing into its own module [[GH-459](https://github.com/hashicorp/raft/pull/459)]
BUG FIXES
* Update LogCache `StoreLogs()` to capture an error that would previously cause a panic [[GH-460](https://github.com/hashicorp/raft/pull/460)]
# 1.2.0 (October 5th, 2020)
IMPROVEMENTS
* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)]
@ -85,4 +101,4 @@ v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. Th
# 0.1.0 (September 29th, 2017)
v0.1.0 is the original stable version of the library that was in master and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0.
v0.1.0 is the original stable version of the library that was in main and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0.

View File

@ -16,28 +16,28 @@ endif
TEST_RESULTS_DIR?=/tmp/test-results
test:
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=180s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=180s -tags batchtest -race .
integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=60s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=60s -tags batchtest -run=Integ .
ci.test-norace:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -tags batchtest
ci.test:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -race -tags batchtest .
ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=60s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=60s -run=Integ -tags batchtest .
fuzz:
go test $(TESTARGS) -timeout=20m ./fuzzy
go test $(TESTARGS) -timeout=20m -tags batchtest ./fuzzy
cd ./fuzzy && go test $(TESTARGS) -timeout=20m .
cd ./fuzzy && go test $(TESTARGS) -timeout=20m -tags batchtest .
deps:
go get -t -d -v ./...

View File

@ -28,16 +28,21 @@ To prevent complications with cgo, the primary backend `MDBStore` is in a separa
called [raft-mdb](http://github.com/hashicorp/raft-mdb). That is the recommended implementation
for the `LogStore` and `StableStore`.
A pure Go backend using [BoltDB](https://github.com/boltdb/bolt) is also available called
A pure Go backend using [Bbolt](https://github.com/etcd-io/bbolt) is also available called
[raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore`
and `StableStore`.
## Community Contributed Examples
[Raft gRPC Example](https://github.com/Jille/raft-grpc-example) - Utilizing the Raft repository with gRPC
## Tagged Releases
As of September 2017, HashiCorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained
* v0.1.0 is the original stable version of the library that was in main and has been maintained
with no breaking API changes. This was in use by Consul prior to version 0.7.0.
* v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version
@ -104,4 +109,3 @@ greatly sacrificing performance.
In terms of performance, Raft is comparable to Paxos. Assuming stable leadership,
committing a log entry requires a single round trip to half of the cluster.
Thus performance is bound by disk I/O and network latency.

View File

@ -81,8 +81,15 @@ type Raft struct {
// be committed and applied to the FSM.
applyCh chan *logFuture
// Configuration provided at Raft initialization
conf Config
// conf stores the current configuration to use. This is the most recent one
// provided. All reads of config values should use the config() helper method
// to read this safely.
conf atomic.Value
// confReloadMu ensures that only one thread can reload config at once since
// we need to read-modify-write the atomic. It is NOT necessary to hold this
// for any other operation e.g. reading config using config().
confReloadMu sync.Mutex
// FSM is the client state machine to apply commands to
fsm FSM
@ -199,7 +206,7 @@ type Raft struct {
// server. Any further attempts to bootstrap will return an error that can be
// safely ignored.
//
// One sane approach is to bootstrap a single server with a configuration
// One approach is to bootstrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
@ -316,6 +323,12 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
continue
}
// Note this is the one place we call fsm.Restore without the
// fsmRestoreAndMeasure wrapper since this function should only be called to
// reset state on disk and the FSM passed will not be used for a running
// server instance. If the same process will eventually become a Raft peer
// then it will call NewRaft and restore again from disk then which will
// report metrics.
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
@ -385,9 +398,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil
}
// GetConfiguration returns the configuration of the Raft cluster without
// starting a Raft instance or connecting to the cluster
// This function has identical behavior to Raft.GetConfiguration
// GetConfiguration returns the persisted configuration of the Raft cluster
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true
@ -486,7 +499,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Make sure we have a valid server address and ID.
protocolVersion := conf.ProtocolVersion
localAddr := ServerAddress(trans.LocalAddr())
localAddr := trans.LocalAddr()
localID := conf.LocalID
// TODO (slackpad) - When we deprecate protocol version 2, remove this
@ -495,11 +508,16 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
}
// Buffer applyCh to MaxAppendEntries if the option is enabled
applyCh := make(chan *logFuture)
if conf.BatchApplyCh {
applyCh = make(chan *logFuture, conf.MaxAppendEntries)
}
// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
applyCh: applyCh,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
@ -524,6 +542,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
}
r.conf.Store(*conf)
// Initialize as a follower.
r.setState(Follower)
@ -577,23 +597,23 @@ func (r *Raft) restoreSnapshot() error {
// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
if !r.conf.NoSnapshotRestoreOnStart {
if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue
}
err = r.fsm.Restore(source)
// Close the source after the restore has completed
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
source.Close()
if err != nil {
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue
}
source.Close()
r.logger.Info("restored from snapshot", "id", snapshot.ID)
}
// Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index)
@ -624,6 +644,45 @@ func (r *Raft) restoreSnapshot() error {
return nil
}
func (r *Raft) config() Config {
return r.conf.Load().(Config)
}
// ReloadConfig updates the configuration of a running raft node. If the new
// configuration is invalid an error is returned and no changes made to the
// instance. All fields will be copied from rc into the new configuration, even
// if they are zero valued.
func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
r.confReloadMu.Lock()
defer r.confReloadMu.Unlock()
// Load the current config (note we are under a lock so it can't be changed
// between this read and a later Store).
oldCfg := r.config()
// Set the reloadable fields
newCfg := rc.apply(oldCfg)
if err := ValidateConfig(&newCfg); err != nil {
return err
}
r.conf.Store(newCfg)
return nil
}
// ReloadableConfig returns the current state of the reloadable fields in Raft's
// configuration. This is useful for programs to discover the current state for
// reporting to users or tests. It is safe to call from any goroutine. It is
// intended for reporting and testing purposes primarily; external
// synchronization would be required to safely use this in a read-modify-write
// pattern for reloadable configuration options.
func (r *Raft) ReloadableConfig() ReloadableConfig {
cfg := r.config()
var rc ReloadableConfig
rc.fromConfig(cfg)
return rc
}
// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster with an

View File

@ -23,7 +23,7 @@ type commitment struct {
startIndex uint64
}
// newCommitment returns an commitment struct that notifies the provided
// newCommitment returns a commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'configuration' is the servers in the cluster.

View File

@ -151,25 +151,36 @@ type Config struct {
// an inconsistent log.
MaxAppendEntries int
// BatchApplyCh indicates whether we should buffer applyCh
// to size MaxAppendEntries. This enables batch log commitment,
// but breaks the timeout guarantee on Apply. Specifically,
// a log can be added to the applyCh buffer but not actually be
// processed until after the specified timeout.
BatchApplyCh bool
// If we are a member of a cluster, and RemovePeer is invoked for the
// local node, then we forget all peers and transition into the follower state.
// If ShutdownOnRemove is is set, we additional shutdown Raft. Otherwise,
// If ShutdownOnRemove is set, we additional shutdown Raft. Otherwise,
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool
// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here is the initial setting used.
// This can be tuned during operation using ReloadConfig.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
// SnapshotInterval controls how often we check if we should perform a
// snapshot. We randomly stagger between this value and 2x this value to avoid
// the entire cluster from performing a snapshot at once. The value passed
// here is the initial setting used. This can be tuned during operation using
// ReloadConfig.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
// we perform a snapshot. This is to prevent excessive snapshotting by
// replaying a small set of logs instead. The value passed here is the initial
// setting used. This can be tuned during operation using ReloadConfig.
SnapshotThreshold uint64
// LeaderLeaseTimeout is used to control how long the "lease" lasts
@ -178,7 +189,7 @@ type Config struct {
// step down as leader.
LeaderLeaseTimeout time.Duration
// The unique ID for this server across all time. When running with
// LocalID is a unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.
LocalID ServerID
@ -192,25 +203,65 @@ type Config struct {
// Defaults to os.Stderr.
LogOutput io.Writer
// LogLevel represents a log level. If a no matching string is specified,
// hclog.NoLevel is assumed.
// LogLevel represents a log level. If the value does not match a known
// logging level hclog.NoLevel is used.
LogLevel string
// Logger is a user-provided hc-log logger. If nil, a logger writing to
// Logger is a user-provided logger. If nil, a logger writing to
// LogOutput with LogLevel is used.
Logger hclog.Logger
// NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// FSM on start. This is useful if your FSM recovers from other mechanisms
// than raft snapshotting. Snapshot metadata will still be used to initialize
// raft's configuration and index values. This is used in NewRaft and
// RestoreCluster.
// raft's configuration and index values.
NoSnapshotRestoreOnStart bool
// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
// Reconfiguring some fields is potentially dangerous so we should only
// selectively enable it for fields where that is allowed.
type ReloadableConfig struct {
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here updates the setting at runtime
// which will take effect as soon as the next snapshot completes and truncation
// occurs.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
}
// apply sets the reloadable fields on the passed Config to the values in
// `ReloadableConfig`. It returns a copy of Config with the fields from this
// ReloadableConfig set.
func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
return to
}
// fromConfig copies the reloadable fields from the passed Config.
func (rc *ReloadableConfig) fromConfig(from Config) {
rc.TrailingLogs = from.TrailingLogs
rc.SnapshotInterval = from.SnapshotInterval
rc.SnapshotThreshold = from.SnapshotThreshold
}
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
@ -238,20 +289,20 @@ func ValidateConfig(config *Config) error {
}
if config.ProtocolVersion < protocolMin ||
config.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version %d must be >= %d and <= %d",
return fmt.Errorf("ProtocolVersion %d must be >= %d and <= %d",
config.ProtocolVersion, protocolMin, ProtocolVersionMax)
}
if len(config.LocalID) == 0 {
return fmt.Errorf("LocalID cannot be empty")
}
if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("Heartbeat timeout is too low")
return fmt.Errorf("HeartbeatTimeout is too low")
}
if config.ElectionTimeout < 5*time.Millisecond {
return fmt.Errorf("Election timeout is too low")
return fmt.Errorf("ElectionTimeout is too low")
}
if config.CommitTimeout < time.Millisecond {
return fmt.Errorf("Commit timeout is too low")
return fmt.Errorf("CommitTimeout is too low")
}
if config.MaxAppendEntries <= 0 {
return fmt.Errorf("MaxAppendEntries must be positive")
@ -260,16 +311,16 @@ func ValidateConfig(config *Config) error {
return fmt.Errorf("MaxAppendEntries is too large")
}
if config.SnapshotInterval < 5*time.Millisecond {
return fmt.Errorf("Snapshot interval is too low")
return fmt.Errorf("SnapshotInterval is too low")
}
if config.LeaderLeaseTimeout < 5*time.Millisecond {
return fmt.Errorf("Leader lease timeout is too low")
return fmt.Errorf("LeaderLeaseTimeout is too low")
}
if config.LeaderLeaseTimeout > config.HeartbeatTimeout {
return fmt.Errorf("Leader lease timeout cannot be larger than heartbeat timeout")
return fmt.Errorf("LeaderLeaseTimeout cannot be larger than heartbeat timeout")
}
if config.ElectionTimeout < config.HeartbeatTimeout {
return fmt.Errorf("Election timeout must be equal or greater than Heartbeat Timeout")
return fmt.Errorf("ElectionTimeout must be equal or greater than Heartbeat Timeout")
}
return nil
}

View File

@ -181,17 +181,17 @@ func checkConfiguration(configuration Configuration) error {
var voters int
for _, server := range configuration.Servers {
if server.ID == "" {
return fmt.Errorf("Empty ID in configuration: %v", configuration)
return fmt.Errorf("empty ID in configuration: %v", configuration)
}
if server.Address == "" {
return fmt.Errorf("Empty address in configuration: %v", server)
return fmt.Errorf("empty address in configuration: %v", server)
}
if idSet[server.ID] {
return fmt.Errorf("Found duplicate ID in configuration: %v", server.ID)
return fmt.Errorf("found duplicate ID in configuration: %v", server.ID)
}
idSet[server.ID] = true
if addressSet[server.Address] {
return fmt.Errorf("Found duplicate address in configuration: %v", server.Address)
return fmt.Errorf("found duplicate address in configuration: %v", server.Address)
}
addressSet[server.Address] = true
if server.Suffrage == Voter {
@ -199,7 +199,7 @@ func checkConfiguration(configuration Configuration) error {
}
}
if voters == 0 {
return fmt.Errorf("Need at least one voter in configuration: %v", configuration)
return fmt.Errorf("need at least one voter in configuration: %v", configuration)
}
return nil
}
@ -209,7 +209,7 @@ func checkConfiguration(configuration Configuration) error {
// that it can be unit tested easily.
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
if change.prevIndex > 0 && change.prevIndex != currentIndex {
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
return Configuration{}, fmt.Errorf("configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
}
configuration := current.Clone()

View File

@ -175,16 +175,13 @@ func (r *Raft) runFSM() {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}
defer source.Close()
// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
return
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
// Update the last index and term
lastIndex = meta.Index
@ -233,3 +230,17 @@ func (r *Raft) runFSM() {
}
}
}
// fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure
// and report timing metrics. The caller is still responsible for calling Close
// on the source in all cases.
func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error {
start := time.Now()
if err := fsm.Restore(source); err != nil {
return err
}
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"},
float32(time.Since(start).Milliseconds()))
return nil
}

View File

@ -9,12 +9,13 @@ import (
// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
// Error blocks until the future arrives and then returns the error status
// of the future. This may be called any number of times - all calls will
// return the same value, however is not OK to call this method twice
// concurrently on the same Future instance.
// Error will only return generic errors related to raft, such
// as ErrLeadershipLost, or ErrRaftShutdown. Some operations, such as
// ApplyLog, may also return errors from other methods.
Error() error
}
@ -32,9 +33,11 @@ type IndexFuture interface {
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
// Response returns the FSM response as returned by the FSM.Apply method. This
// must not be called until after the Error method has returned.
// Note that if FSM.Apply returns an error, it will be returned by Response,
// and not by the Error method, so it is always important to check Response
// for errors from the FSM.
Response() interface{}
}

View File

@ -4,10 +4,7 @@ go 1.12
require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/boltdb/bolt v1.3.1 // indirect
github.com/hashicorp/go-hclog v0.9.1
github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/stretchr/testify v1.3.0
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect
)

View File

@ -2,8 +2,6 @@ github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -22,8 +20,6 @@ github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCS
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@ -41,5 +37,3 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -1,5 +1,12 @@
package raft
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
)
// LogType describes various types of log entries.
type LogType uint8
@ -33,6 +40,26 @@ const (
LogConfiguration
)
// String returns LogType as a human readable string.
func (lt LogType) String() string {
switch lt {
case LogCommand:
return "LogCommand"
case LogNoop:
return "LogNoop"
case LogAddPeerDeprecated:
return "LogAddPeerDeprecated"
case LogRemovePeerDeprecated:
return "LogRemovePeerDeprecated"
case LogBarrier:
return "LogBarrier"
case LogConfiguration:
return "LogConfiguration"
default:
return fmt.Sprintf("%d", lt)
}
}
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
@ -62,6 +89,19 @@ type Log struct {
// trouble, so gating extension behavior via some flag in the client
// program is also a good idea.
Extensions []byte
// AppendedAt stores the time the leader first appended this log to it's
// LogStore. Followers will observe the leader's time. It is not used for
// coordination or as part of the replication protocol at all. It exists only
// to provide operational information for example how many seconds worth of
// logs are present on the leader which might impact follower's ability to
// catch up after restoring a large snapshot. We should never rely on this
// being in the past when appending on a follower or reading a log back since
// the clock skew can mean a follower could see a log with a future timestamp.
// In general too the leader is not required to persist the log before
// delivering to followers although the current implementation happens to do
// this.
AppendedAt time.Time
}
// LogStore is used to provide an interface for storing
@ -85,3 +125,52 @@ type LogStore interface {
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}
func oldestLog(s LogStore) (Log, error) {
var l Log
// We might get unlucky and have a truncate right between getting first log
// index and fetching it so keep trying until we succeed or hard fail.
var lastFailIdx uint64
var lastErr error
for {
firstIdx, err := s.FirstIndex()
if err != nil {
return l, err
}
if firstIdx == 0 {
return l, ErrLogNotFound
}
if firstIdx == lastFailIdx {
// Got same index as last time around which errored, don't bother trying
// to fetch it again just return the error.
return l, lastErr
}
err = s.GetLog(firstIdx, &l)
if err == nil {
// We found the oldest log, break the loop
break
}
// We failed, keep trying to see if there is a new firstIndex
lastFailIdx = firstIdx
lastErr = err
}
return l, nil
}
func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(interval):
// In error case emit 0 as the age
ageMs := float32(0.0)
l, err := oldestLog(s)
if err == nil && !l.AppendedAt.IsZero() {
ageMs = float32(time.Since(l.AppendedAt).Milliseconds())
}
metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs)
case <-stopCh:
return
}
}
}

View File

@ -51,14 +51,17 @@ func (c *LogCache) StoreLog(log *Log) error {
}
func (c *LogCache) StoreLogs(logs []*Log) error {
// Insert the logs into the ring buffer
err := c.store.StoreLogs(logs)
// Insert the logs into the ring buffer, but only on success
if err != nil {
return fmt.Errorf("unable to store logs within log store, err: %q", err)
}
c.l.Lock()
for _, l := range logs {
c.cache[l.Index%uint64(len(c.cache))] = l
}
c.l.Unlock()
return c.store.StoreLogs(logs)
return nil
}
func (c *LogCache) FirstIndex() (uint64, error) {

View File

@ -5,13 +5,13 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"io"
"net"
"os"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
)
@ -122,7 +122,6 @@ type StreamLayer interface {
type netConn struct {
target ServerAddress
conn net.Conn
r *bufio.Reader
w *bufio.Writer
dec *codec.Decoder
enc *codec.Encoder
@ -344,12 +343,10 @@ func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
netConn := &netConn{
target: target,
conn: conn,
r: bufio.NewReader(conn),
dec: codec.NewDecoder(bufio.NewReader(conn), &codec.MsgpackHandle{}),
w: bufio.NewWriter(conn),
}
// Setup encoder/decoders
netConn.dec = codec.NewDecoder(netConn.r, &codec.MsgpackHandle{})
netConn.enc = codec.NewEncoder(netConn.w, &codec.MsgpackHandle{})
// Done

View File

@ -2,6 +2,7 @@ package raft
import (
"sync/atomic"
"time"
)
// Observation is sent along the given channel to observers when an event occurs.
@ -27,6 +28,12 @@ type PeerObservation struct {
Peer Server
}
// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader
type FailedHeartbeatObservation struct {
PeerID ServerID
LastContact time.Time
}
// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64

View File

@ -16,6 +16,7 @@ import (
const (
minCheckInterval = 10 * time.Millisecond
oldestLogGaugeInterval = 10 * time.Second
)
var (
@ -29,7 +30,7 @@ var (
// responses.
func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion,
ProtocolVersion: r.config().ProtocolVersion,
}
}
@ -56,7 +57,7 @@ func (r *Raft) checkRPCHeader(rpc RPC) error {
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
if header.ProtocolVersion < r.conf.ProtocolVersion-1 {
if header.ProtocolVersion < r.config().ProtocolVersion-1 {
return ErrUnsupportedProtocol
}
@ -151,7 +152,7 @@ func (r *Raft) runFollower() {
didWarn := false
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)
for r.getState() == Follower {
select {
@ -187,11 +188,12 @@ func (r *Raft) runFollower() {
case <-heartbeatTimer:
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)
// Check if we have had a successful contact
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
if time.Now().Sub(lastContact) < hbTimeout {
continue
}
@ -228,7 +230,8 @@ func (r *Raft) runFollower() {
// called on the main thread, and only makes sense in the follower state.
func (r *Raft) liveBootstrap(configuration Configuration) error {
// Use the pre-init API to make the static updates.
err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots,
cfg := r.config()
err := BootstrapCluster(&cfg, r.logs, r.stable, r.snapshots,
r.trans, configuration)
if err != nil {
return err
@ -260,7 +263,7 @@ func (r *Raft) runCandidate() {
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()
electionTimer := randomTimeout(r.conf.ElectionTimeout)
electionTimer := randomTimeout(r.config().ElectionTimeout)
// Tally the votes, need a simple majority
grantedVotes := 0
@ -344,10 +347,7 @@ func (r *Raft) setLeadershipTransferInProgress(v bool) {
func (r *Raft) getLeadershipTransferInProgress() bool {
v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress)
if v == 1 {
return true
}
return false
return v == 1
}
func (r *Raft) setupLeaderState() {
@ -370,8 +370,13 @@ func (r *Raft) runLeader() {
// Notify that we are the leader
overrideNotifyBool(r.leaderCh, true)
// Store the notify chan. It's not reloadable so shouldn't change before the
// defer below runs, but this makes sure we always notify the same chan if
// ever for both gaining and loosing leadership.
notify := r.config().NotifyCh
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- true:
case <-r.shutdownCh:
@ -382,8 +387,14 @@ func (r *Raft) runLeader() {
// leaderloop.
r.setupLeaderState()
// Run a background go-routine to emit metrics on log age
stopCh := make(chan struct{})
go emitLogStoreMetrics(r.logs, []string{"raft", "leader"}, oldestLogGaugeInterval, stopCh)
// Cleanup state on step down
defer func() {
close(stopCh)
// Since we were the leader previously, we update our
// last contact time when we step down, so that we are not
// reporting a last contact time from before we were the
@ -427,7 +438,7 @@ func (r *Raft) runLeader() {
overrideNotifyBool(r.leaderCh, false)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- false:
case <-r.shutdownCh:
@ -548,7 +559,9 @@ func (r *Raft) leaderLoop() {
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
// This is only used for the first lease check, we reload lease below
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)
for r.getState() == Leader {
select {
@ -583,7 +596,7 @@ func (r *Raft) leaderLoop() {
// the stopCh and doneCh.
go func() {
select {
case <-time.After(r.conf.ElectionTimeout):
case <-time.After(r.config().ElectionTimeout):
close(stopCh)
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
@ -680,7 +693,7 @@ func (r *Raft) leaderLoop() {
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown {
if r.conf.ShutdownOnRemove {
if r.config().ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
@ -751,7 +764,7 @@ func (r *Raft) leaderLoop() {
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
for i := 0; i < r.config().MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
@ -776,7 +789,7 @@ func (r *Raft) leaderLoop() {
// Next check interval should adjust for the last node we've
// contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
checkInterval := r.config().LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval {
checkInterval = minCheckInterval
}
@ -872,6 +885,11 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Track contacted nodes, we can always contact ourself
contacted := 0
// Store lease timeout for this one check invocation as we need to refer to it
// in the loop and would be confusing if it ever becomes reloadable and
// changes between iterations below.
leaseTimeout := r.config().LeaderLeaseTimeout
// Check each follower
var maxDiff time.Duration
now := time.Now()
@ -883,14 +901,14 @@ func (r *Raft) checkLeaderLease() time.Duration {
}
f := r.leaderState.replState[server.ID]
diff := now.Sub(f.LastContact())
if diff <= r.conf.LeaderLeaseTimeout {
if diff <= leaseTimeout {
contacted++
if diff > maxDiff {
maxDiff = diff
}
} else {
// Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout {
if diff <= 3*leaseTimeout {
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else {
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
@ -1080,6 +1098,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
applyLog.log.AppendedAt = now
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
@ -1131,7 +1150,11 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
}
}
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries
batch := make([]*commitTuple, 0, maxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
@ -1156,9 +1179,9 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
batch = make([]*commitTuple, 0, maxAppendEntries)
}
case futureOk:
@ -1282,7 +1305,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}
// Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))
r.setLeader(r.trans.DecodePeer(a.Leader))
// Verify the last log entry
if a.PrevLogEntry > 0 {
@ -1542,7 +1565,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
}
// Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(req.Leader)))
r.setLeader(r.trans.DecodePeer(req.Leader))
// Create a new snapshot
var reqConfiguration Configuration
@ -1738,16 +1761,6 @@ func (r *Raft) setState(state RaftState) {
}
}
// LookupServer looks up a server by ServerID.
func (r *Raft) lookupServer(id ServerID) *Server {
for _, server := range r.configurations.latest.Servers {
if server.ID != r.localID {
return &server
}
}
return nil
}
// pickServer returns the follower that is most up to date and participating in quorum.
// Because it accesses leaderstate, it should only be called from the leaderloop.
func (r *Raft) pickServer() *Server {

View File

@ -161,7 +161,7 @@ RPC:
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
case <-randomTimeout(r.config().CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
@ -373,7 +373,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
case <-randomTimeout(r.config().HeartbeatTimeout / 10):
case <-stopCh:
return
}
@ -381,6 +381,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: s.peer.ID, LastContact: s.LastContact()})
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
@ -447,7 +448,7 @@ SEND:
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout):
case <-randomTimeout(r.config().CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
}
@ -562,9 +563,12 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error
// setNewLogs is used to setup the logs which should be appended for a request.
func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
// Append up to MaxAppendEntries or up to the lastIndex
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries)
maxIndex := min(nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex)
// Append up to MaxAppendEntries or up to the lastIndex. we need to use a
// consistent value for maxAppendEntries in the lines below in case it ever
// becomes reloadable.
maxAppendEntries := r.config().MaxAppendEntries
req.Entries = make([]*Log, 0, maxAppendEntries)
maxIndex := min(nextIndex+uint64(maxAppendEntries)-1, lastIndex)
for i := nextIndex; i <= maxIndex; i++ {
oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil {

View File

@ -69,7 +69,7 @@ type SnapshotSink interface {
func (r *Raft) runSnapshots() {
for {
select {
case <-randomTimeout(r.conf.SnapshotInterval):
case <-randomTimeout(r.config().SnapshotInterval):
// Check if we should snapshot
if !r.shouldSnapshot() {
continue
@ -113,7 +113,7 @@ func (r *Raft) shouldSnapshot() bool {
// Compare the delta to the threshold
delta := lastIdx - lastSnap
return delta >= r.conf.SnapshotThreshold
return delta >= r.config().SnapshotThreshold
}
// takeSnapshot is used to take a new snapshot. This must only be called from
@ -219,7 +219,11 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// Check if we have enough logs to truncate
lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
// Use a consistent value for trailingLogs for the duration of this method
// call to avoid surprising behaviour.
trailingLogs := r.config().TrailingLogs
if lastLogIdx <= trailingLogs {
return nil
}
@ -227,7 +231,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
maxLog := min(snapIdx, lastLogIdx-trailingLogs)
if minLog > maxLog {
r.logger.Info("no logs to truncate")

View File

@ -11,6 +11,6 @@ fi
# Generate the tag.
echo "==> Tagging version $VERSION..."
git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION"
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" main
exit 0

View File

@ -27,7 +27,7 @@ func inmemConfig(t *testing.T) *Config {
conf.ElectionTimeout = 50 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond
conf.Logger = newTestLeveledLogger(t)
conf.Logger = newTestLogger(t)
return conf
}
@ -144,9 +144,6 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
}
if a.prefix != "" {
l := a.prefix + ": " + string(d)
if testing.Verbose() {
fmt.Printf("testLoggerAdapter verbose: %s\n", l)
}
a.t.Log(l)
return len(l), nil
}
@ -156,27 +153,21 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
}
func newTestLogger(t *testing.T) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t},
Level: hclog.DefaultLevel,
})
return newTestLoggerWithPrefix(t, "")
}
// newTestLoggerWithPrefix returns a Logger that can be used in tests. prefix will
// be added as the name of the logger.
//
// If tests are run with -v (verbose mode, or -json which implies verbose) the
// log output will go to stderr directly.
// If tests are run in regular "quiet" mode, logs will be sent to t.Log so that
// the logs only appear when a test fails.
func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t, prefix: prefix},
Level: hclog.DefaultLevel,
})
}
if testing.Verbose() {
return hclog.New(&hclog.LoggerOptions{Name: prefix})
}
func newTestLeveledLogger(t *testing.T) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Name: "",
Output: &testLoggerAdapter{t: t},
})
}
func newTestLeveledLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Name: prefix,
Output: &testLoggerAdapter{t: t, prefix: prefix},
@ -243,8 +234,8 @@ func (c *cluster) Failf(format string, args ...interface{}) {
// other goroutines created during the test. Calling FailNowf does not stop
// those other goroutines.
func (c *cluster) FailNowf(format string, args ...interface{}) {
c.logger.Error(fmt.Sprintf(format, args...))
c.t.FailNow()
c.t.Helper()
c.t.Fatalf(format, args...)
}
// Close shuts down the cluster and cleans up.
@ -264,7 +255,7 @@ func (c *cluster) Close() {
for _, f := range futures {
if err := f.Error(); err != nil {
c.FailNowf("shutdown future err: %v", err)
c.t.Fatalf("shutdown future err: %v", err)
}
}
@ -325,7 +316,7 @@ CHECK:
c.t.FailNow()
case <-limitCh:
c.FailNowf("timeout waiting for replication")
c.t.Fatalf("timeout waiting for replication")
case <-ch:
for _, fsmRaw := range c.fsms {
@ -423,14 +414,14 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.FailNow()
case <-limitCh:
c.FailNowf("timeout waiting for stable %s state", s)
c.t.Fatalf("timeout waiting for stable %s state", s)
case <-eventCh:
c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C:
if !ok {
c.FailNowf("timer channel errored")
c.t.Fatalf("timer channel errored")
}
c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
@ -442,9 +433,10 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
// Leader waits for the cluster to elect a leader and stay in a stable state.
func (c *cluster) Leader() *Raft {
c.t.Helper()
leaders := c.GetInState(Leader)
if len(leaders) != 1 {
c.FailNowf("expected one leader: %v", leaders)
c.t.Fatalf("expected one leader: %v", leaders)
}
return leaders[0]
}
@ -455,7 +447,7 @@ func (c *cluster) Followers() []*Raft {
expFollowers := len(c.rafts) - 1
followers := c.GetInState(Follower)
if len(followers) != expFollowers {
c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers)
c.t.Fatalf("timeout waiting for %d followers (followers are %v)", expFollowers, followers)
}
return followers
}
@ -551,7 +543,7 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) {
}
}
if fail {
c.FailNowf("at least one peer has the wrong notion of leader")
t.Fatalf("at least one peer has the wrong notion of leader")
}
}
@ -572,7 +564,7 @@ CHECK:
if len(first.logs) != len(fsm.logs) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("FSM log length mismatch: %d %d",
t.Fatalf("FSM log length mismatch: %d %d",
len(first.logs), len(fsm.logs))
} else {
goto WAIT
@ -583,7 +575,7 @@ CHECK:
if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("FSM log mismatch at index %d", idx)
t.Fatalf("FSM log mismatch at index %d", idx)
} else {
goto WAIT
}
@ -592,7 +584,7 @@ CHECK:
if len(first.configurations) != len(fsm.configurations) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("FSM configuration length mismatch: %d %d",
t.Fatalf("FSM configuration length mismatch: %d %d",
len(first.logs), len(fsm.logs))
} else {
goto WAIT
@ -603,7 +595,7 @@ CHECK:
if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) {
fsm.Unlock()
if time.Now().After(limit) {
c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
t.Fatalf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
} else {
goto WAIT
}
@ -626,7 +618,7 @@ WAIT:
func (c *cluster) getConfiguration(r *Raft) Configuration {
future := r.GetConfiguration()
if err := future.Error(); err != nil {
c.FailNowf("failed to get configuration: %v", err)
c.t.Fatalf("failed to get configuration: %v", err)
return Configuration{}
}
@ -647,7 +639,7 @@ CHECK:
otherSet := c.getConfiguration(raft)
if !reflect.DeepEqual(peerSet, otherSet) {
if time.Now().After(limit) {
c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet)
t.Fatalf("peer mismatch: %+v %+v", peerSet, otherSet)
} else {
goto WAIT
}
@ -700,7 +692,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
for i := 0; i < opts.Peers; i++ {
dir, err := ioutil.TempDir("", "raft")
if err != nil {
c.FailNowf("err: %v", err)
t.Fatalf("err: %v", err)
}
store := NewInmemStore()
@ -750,23 +742,23 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
peerConf := opts.Conf
peerConf.LocalID = configuration.Servers[i].ID
peerConf.Logger = newTestLeveledLoggerWithPrefix(t, string(configuration.Servers[i].ID))
peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID))
if opts.Bootstrap {
err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration)
if err != nil {
c.FailNowf("BootstrapCluster failed: %v", err)
t.Fatalf("BootstrapCluster failed: %v", err)
}
}
raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans)
if err != nil {
c.FailNowf("NewRaft failed: %v", err)
t.Fatalf("NewRaft failed: %v", err)
}
raft.RegisterObserver(NewObserver(c.observationCh, false, nil))
if err != nil {
c.FailNowf("RegisterObserver failed: %v", err)
t.Fatalf("RegisterObserver failed: %v", err)
}
c.rafts = append(c.rafts, raft)
}

4
vendor/modules.txt vendored
View File

@ -32,7 +32,7 @@ github.com/NYTimes/gziphandler
github.com/StackExchange/wmi
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/circbuf
# github.com/armon/go-metrics v0.3.6
# github.com/armon/go-metrics v0.3.7
github.com/armon/go-metrics
github.com/armon/go-metrics/circonus
github.com/armon/go-metrics/datadog
@ -481,7 +481,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.2.0
# github.com/hashicorp/raft v1.3.0
github.com/hashicorp/raft
# github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-autopilot

View File

@ -1762,29 +1762,50 @@ bind_addr = "{{ GetPrivateInterfaces | include \"network\" \"10.0.0.0/8\" | attr
- `raft_protocol` ((#raft_protocol)) Equivalent to the [`-raft-protocol`
command-line flag](#_raft_protocol).
- `raft_snapshot_threshold` ((#\_raft_snapshot_threshold)) This controls
the minimum number of raft commit entries between snapshots that are saved to disk.
This is a low-level parameter that should rarely need to be changed. Very busy
clusters experiencing excessive disk IO may increase this value to reduce disk
IO, and minimize the chances of all servers taking snapshots at the same time.
- `raft_snapshot_threshold` ((#\_raft_snapshot_threshold)) This controls the
minimum number of raft commit entries between snapshots that are saved to
disk. This is a low-level parameter that should rarely need to be changed.
Very busy clusters experiencing excessive disk IO may increase this value to
reduce disk IO, and minimize the chances of all servers taking snapshots at
the same time. Increasing this trades off disk IO for disk space since the log
will grow much larger and the space in the raft.db file can't be reclaimed
till the next snapshot. Servers may take longer to recover from crashes or
failover if this is increased significantly as more logs will need to be
replayed. In Consul 1.1.0 and later this defaults to 16384, and in prior
versions it was set to 8192.
Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
server a `SIGHUP` to allow tuning snapshot activity without a rolling restart
in emergencies.
- `raft_snapshot_interval` ((#\_raft_snapshot_interval)) This controls how often
servers check if they need to save a snapshot to disk. This is a low-level
parameter that should rarely need to be changed. Very busy clusters
experiencing excessive disk IO may increase this value to reduce disk IO, and
minimize the chances of all servers taking snapshots at the same time.
Increasing this trades off disk IO for disk space since the log will grow much
larger and the space in the raft.db file can't be reclaimed till the next snapshot.
Servers may take longer to recover from crashes or failover if this is increased
significantly as more logs will need to be replayed. In Consul 1.1.0 and later
this defaults to 16384, and in prior versions it was set to 8192.
larger and the space in the raft.db file can't be reclaimed till the next
snapshot. Servers may take longer to recover from crashes or failover if this
is increased significantly as more logs will need to be replayed. In Consul
1.1.0 and later this defaults to `30s`, and in prior versions it was set to
`5s`.
- `raft_snapshot_interval` ((#\_raft_snapshot_interval)) This controls how often servers check if they
need to save a snapshot to disk. This is a low-level parameter that should rarely need to be changed. Very busy clusters experiencing excessive disk IO may increase this value to reduce disk IO, and minimize the chances of all servers taking snapshots at the same time. Increasing this trades
off disk IO for disk space since the log will grow much larger and the space in the raft.db file can't be reclaimed till the next snapshot. Servers may take longer to recover from crashes or failover if this is increased significantly as more logs will need to be replayed. In Consul 1.1.0 and later this defaults to `30s`, and in prior versions it was set to `5s`.
Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
server a `SIGHUP` to allow tuning snapshot activity without a rolling restart
in emergencies.
- `raft_trailing_logs` - This controls how many
log entries are left in the log store on disk after a snapshot is made. This should
only be adjusted when followers cannot catch up to the leader due to a very large
snapshot size that and high write throughput causing log truncation before an snapshot
can be fully installed. If you need to use this to recover a cluster, consider
reducing write throughput or the amount of data stored on Consul as it is likely
under a load it is not designed to handle. The default value is 10000 which is
suitable for all normal workloads. Added in Consul 1.5.3.
- `raft_trailing_logs` - This controls how many log entries are left in the log
store on disk after a snapshot is made. This should only be adjusted when
followers cannot catch up to the leader due to a very large snapshot size
and high write throughput causing log truncation before an snapshot can be
fully installed on a follower. If you need to use this to recover a cluster,
consider reducing write throughput or the amount of data stored on Consul as
it is likely under a load it is not designed to handle. The default value is
10000 which is suitable for all normal workloads. Added in Consul 1.5.3.
Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
server a `SIGHUP` to allow recovery without downtime when followers can't keep
up.
- `reap` This controls Consul's automatic reaping of child processes,
which is useful if Consul is running as PID 1 in a Docker container. If this isn't
@ -2292,6 +2313,13 @@ items which are reloaded include:
- Log level
- [Metric Prefix Filter](#telemetry-prefix_filter)
- [Node Metadata](#node_meta)
- Some Raft options (since Consul 1.10.0)
- [`raft_snapshot_threshold`](#_raft_snapshot_threshold)
- [`raft_snapshot_interval`](#_raft_snapshot_interval)
- [`raft_trailing_logs`](#_raft_trailing_logs)
- These can be important in certain outage situations so being able to control
them without a restart provides a recovery path that doesn't involve
downtime. They generally shouldn't be changed otherwise.
- [RPC rate limiting](#limits)
- [HTTP Maximum Connections per Client](#http_max_conns_per_client)
- Services

View File

@ -147,6 +147,109 @@ you will need to apply a function such as InfluxDB's [`non_negative_difference()
Sudden large changes to the `consul.client.rpc` metrics (greater than 50% deviation from baseline).
`consul.client.rpc.exceeded` or `consul.client.rpc.failed` count > 0, as it implies that an agent is being rate-limited or fails to make an RPC request to a Consul server
### Raft Replication Capacity Issues
| Metric Name | Description | Unit | Type |
| :--------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------- | :------ |
| `consul.raft.fsm.lastRestoreDuration` | Measures the time taken to restore the FSM from a snapshot on an agent restart or from the leader calling installSnapshot. This is a gauge that holds it's value since most servers only restore during restarts which are typically infrequent. | ms | gauge |
| `consul.raft.leader.oldestLogAge` | The number of milliseconds since the _oldest_ log in the leader's log store was written. This can be important for replication health where write rate is high and the snapshot is large as followers may be unable to recover from a restart if restoring takes longer than the minimum value for the current leader. Compare this with `consul.raft.fsm.lastRestoreDuration` and `consul.raft.rpc.installSnapshot` to monitor. In normal usage this gauge value will grow linearly over time until a snapshot completes on the leader and the log is truncated. | ms | gauge |
| `consul.raft.rpc.installSnapshot` | Measures the time taken to process the installSnapshot RPC call. This metric should only be seen on agents which are currently in the follower state. | ms | timer |
**Why they're important:** These metrics allow operators to monitor the health
and capacity of raft replication on servers. **When Consul is handling large
amounts of data and high write throughput** it is possible for the cluster to
get into the following state:
* Write throughput is high (say 500 commits per second or more) and constant
* The leader is writing out a large snapshot every minute or so
* The snapshot is large enough that it takes considerable time to restore from
disk on a restart or from the leader if a follower gets behind
* Disk IO available allows the leader to write a snapshot faster than it can be
restored from disk on a follower
Under these conditions, a follower after a restart may be unable to catch up on
replication and become a voter again since it takes longer to restore from disk
or the leader than the leader takes to write a new snapshot and truncate its
logs. Servers retain
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) (default
`10240`) log entries even if their snapshot was more recent. On a leader
processing 500 commits/second, that is only about 20 seconds worth of logs.
Assuming the leader is able to write out a snapshot and truncate the logs in
less than 20 seconds, there will only be 20 seconds worth of "recent" logs
available on the leader right after the leader has taken a snapshot and never
more than about 80 seconds worth assuming it is taking a snapshot and truncating
logs every 60 seconds.
In this state, followers must be able to restore a snapshot into memory and
resume replication in under 80 seconds otherwise they will never be able to
rejoin the cluster until write rates reduce. If they take more than 20 seconds
then there will be a chance that they are unlucky with timing when they restart
and have to download a snapshot again from the servers one or more times. If
they take 50 seconds or more then they will likely fail to catch up more often
than they succeed and will remain non-voters for some time until they happen to
complete the restore just before the leader truncates its logs.
In the worst case, the follower will be left continually downloading snapshots
from the leader which are always too old to use by the time they are restored.
This can put additional strain on the leader transferring large snapshots
repeatedly as well as reduce the fault tolerance and serving capacity of the
cluster.
Since Consul 1.5.3
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) has been
configurable. Increasing it allows the leader to retain more logs and give
followers more time to restore and catch up. The tradeoff is potentially
slower appends which eventually might affect write throughput and latency
negatively so setting it arbitrarily high is not recommended. Before Consul
1.10.0 it required a rolling restart to change this configuration on the leader
though and since no followers could restart without loosing health this could
mean loosing cluster availability and needing to recover the cluster from a loss
of quorum.
Since Consul 1.10.0
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) is now
reloadable with `consul reload` or `SIGHUP` allowing operators to increase this
without the leader restarting or loosing leadership allowing the cluster to be
recovered gracefully.
Monitoring these metrics can help avoid or diagnose this state.
**What to look for:**
`consul.raft.leader.oldestLogAge` should look like a saw-tooth wave increasing
linearly with time until the leader takes a snapshot and then jumping down as
the oldest logs are truncated. The lowest point on that line should remain
comfortably higher (i.e. 2x or more) than the time it takes to restore a
snapshot.
There are two ways a snapshot can be restored on a follower: from disk on
startup or from the leader during an `installSnapshot` RPC. The leader only
sends an `installSnapshot` RPC if the follower is new and has no state, or if
it's state is too old for it to catch up with the leaders logs.
`consul.raft.fsm.lastRestoreDuration` shows the time it took to restore from
either source the last time it happened. Most of the time this is when the
server was started. It's a gauge that will always show the last restore duration
(in Consul 1.10.0 and later) however long ago that was.
`consul.raft.rpc.installSnapshot` is the timing information from the leader's
perspective when it installs a new snapshot on a follower. It includes the time
spent transferring the data as well as the follower restoring it. Since these
events are typically infrequent, you may need to graph the last value observed,
for example using `max_over_time` with a large range in Prometheus. While the
restore part will also be reflected in `lastRestoreDuration`, it can be useful
to observe this too since the logs need to be able to cover this entire
operation including the snapshot delivery to ensure followers can always catch
up safely.
Graphing `consul.raft.leader.oldestLogAge` on the same axes as the other two
metrics here can help see at a glance if restore times are creeping dangerously
close to the limit of what the leader is retaining at the current write rate.
Note that if servers don't restart often, then the snapshot could have grown
significantly since the last restore happened so last restore times might not
reflect what would happen if an agent restarts now.
## Metrics Reference
This is a full list of metrics emitted by Consul.
@ -215,41 +318,43 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.cache.fetch_success` | Counts the number of successful fetches by the cache. | counter | counter |
| `consul.cache.fetch_error` | Counts the number of failed fetches by the cache. | counter | counter |
| `consul.cache.evict_expired` | Counts the number of expired entries that are evicted. | counter | counter |
| `consul.raft.fsm.snapshot` | Measures the time taken by the FSM to record the current state for the snapshot. | ms | timer |
| `consul.raft.fsm.apply` | The number of logs committed since the last interval. | commit logs / interval | counter |
| `consul.raft.commitNumLogs` | Measures the count of logs processed for application to the FSM in a single batch. | logs | gauge |
| `consul.raft.fsm.enqueue` | Measures the amount of time to enqueue a batch of logs for the FSM to apply. | ms | timer |
| `consul.raft.fsm.restore` | Measures the time taken by the FSM to restore its state from a snapshot. | ms | timer |
| `consul.raft.snapshot.create` | Measures the time taken to initialize the snapshot process. | ms | timer |
| `consul.raft.snapshot.persist` | Measures the time taken to dump the current snapshot taken by the Consul agent to the disk. | ms | timer |
| `consul.raft.snapshot.takeSnapshot` | Measures the total time involved in taking the current snapshot (creating one and persisting it) by the Consul agent. | ms | timer |
| `consul.raft.replication.heartbeat` | Measures the time taken to invoke appendEntries on a peer, so that it doesnt timeout on a periodic basis. | ms | timer |
| `consul.serf.snapshot.appendLine` | Measures the time taken by the Consul agent to append an entry into the existing log. | ms | timer |
| `consul.serf.snapshot.compact` | Measures the time taken by the Consul agent to compact a log. This operation occurs only when the snapshot becomes large enough to justify the compaction . | ms | timer |
| `consul.raft.applied_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.last_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.state.leader` | Increments whenever a Consul server becomes a leader. If there are frequent leadership changes this may be indication that the servers are overloaded and aren't meeting the soft real-time requirements for Raft, or that there are networking problems between the servers. | leadership transitions / interval | counter |
| `consul.raft.state.candidate` | Increments whenever a Consul server starts an election. If this increments without a leadership change occurring it could indicate that a single server is overloaded or is experiencing network connectivity issues. | election attempts / interval | counter |
| `consul.raft.apply` | Counts the number of Raft transactions occurring over the interval, which is a general indicator of the write load on the Consul servers. | raft transactions / interval | counter |
| `consul.raft.barrier` | Counts the number of times the agent has started the barrier i.e the number of times it has issued a blocking call, to ensure that the agent has all the pending operations that were queued, to be applied to the agent's FSM. | blocks / interval | counter |
| `consul.raft.verify_leader` | Counts the number of times an agent checks whether it is still the leader or not | checks / interval | Counter |
| `consul.raft.restore` | Counts the number of times the restore operation has been performed by the agent. Here, restore refers to the action of raft consuming an external snapshot to restore its state. | operation invoked / interval | counter |
| `consul.raft.commitNumLogs` | Measures the count of logs processed for application to the FSM in a single batch. | logs | gauge |
| `consul.raft.commitTime` | Measures the time it takes to commit a new entry to the Raft log on the leader. | ms | timer |
| `consul.raft.fsm.lastRestoreDuration` | Measures the time taken to restore the FSM from a snapshot on an agent restart or from the leader calling installSnapshot. This is a gauge that holds it's value since most servers only restore during restarts which are typically infrequent. | ms | gauge |
| `consul.raft.fsm.snapshot` | Measures the time taken by the FSM to record the current state for the snapshot. | ms | timer |
| `consul.raft.fsm.apply` | The number of logs committed since the last interval. | commit logs / interval | counter |
| `consul.raft.fsm.enqueue` | Measures the amount of time to enqueue a batch of logs for the FSM to apply. | ms | timer |
| `consul.raft.fsm.restore` | Measures the time taken by the FSM to restore its state from a snapshot. | ms | timer |
| `consul.raft.last_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.leader.dispatchLog` | Measures the time it takes for the leader to write log entries to disk. | ms | timer |
| `consul.raft.leader.dispatchNumLogs` | Measures the number of logs committed to disk in a batch. | logs | gauge |
| `consul.raft.leader.lastContact` | Measures the time since the leader was last able to contact the follower nodes when checking its leader lease. It can be used as a measure for how stable the Raft timing is and how close the leader is to timing out its lease.The lease timeout is 500 ms times the [`raft_multiplier` configuration](/docs/agent/options#raft_multiplier), so this telemetry value should not be getting close to that configured value, otherwise the Raft timing is marginal and might need to be tuned, or more powerful servers might be needed. See the [Server Performance](/docs/install/performance) guide for more details. | ms | timer |
| `consul.raft.leader.oldestLogAge` | The number of milliseconds since the _oldest_ log in the leader's log store was written. This can be important for replication health where write rate is high and the snapshot is large as followers may be unable to recover from a restart if restoring takes longer than the minimum value for the current leader. Compare this with `consul.raft.fsm.lastRestoreDuration` and `consul.raft.rpc.installSnapshot` to monitor. In normal usage this gauge value will grow linearly over time until a snapshot completes on the leader and the log is truncated. Note: this metric won't be emitted until the leader writes a snapshot. After an upgrade to Consul 1.10.0 it won't be emitted until the oldest log was written after the upgrade. | ms | gauge |
| `consul.raft.replication.heartbeat` | Measures the time taken to invoke appendEntries on a peer, so that it doesnt timeout on a periodic basis. | ms | timer |
| `consul.raft.replication.appendEntries` | Measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers. | ms | timer |
| `consul.raft.state.follower` | Counts the number of times an agent has entered the follower mode. This happens when a new agent joins the cluster or after the end of a leader election. | follower state entered / interval | counter |
| `consul.raft.transistion.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter |
| `consul.raft.replication.appendEntries.rpc` | Measures the time taken by the append entries RFC, to replicate the log entries of a leader agent onto its follower agent(s) | ms | timer |
| `consul.raft.replication.appendEntries.logs` | Measures the number of logs replicated to an agent, to bring it up to speed with the leader's logs. | logs appended/ interval | counter |
| `consul.raft.restore` | Counts the number of times the restore operation has been performed by the agent. Here, restore refers to the action of raft consuming an external snapshot to restore its state. | operation invoked / interval | counter |
| `consul.raft.restoreUserSnapshot` | Measures the time taken by the agent to restore the FSM state from a user's snapshot | ms | timer |
| `consul.raft.rpc.processHeartBeat` | Measures the time taken to process a heartbeat request. | ms | timer |
| `consul.raft.rpc.appendEntries` | Measures the time taken to process an append entries RPC call from an agent. | ms | timer |
| `consul.raft.rpc.appendEntries.storeLogs` | Measures the time taken to add any outstanding logs for an agent, since the last appendEntries was invoked | ms | timer |
| `consul.raft.rpc.appendEntries.processLogs` | Measures the time taken to process the outstanding log entries of an agent. | ms | timer |
| `consul.raft.rpc.requestVote` | Measures the time taken to process the request vote RPC call. | ms | timer |
| `consul.raft.rpc.installSnapshot` | Measures the time taken to process the installSnapshot RPC call. This metric should only be seen on agents which are currently in the follower state. | ms | timer |
| `consul.raft.replication.appendEntries.rpc` | Measures the time taken by the append entries RFC, to replicate the log entries of a leader agent onto its follower agent(s) | ms | timer |
| `consul.raft.replication.appendEntries.logs` | Measures the number of logs replicated to an agent, to bring it up to speed with the leader's logs. | logs appended/ interval | counter |
| `consul.raft.leader.lastContact` | This will only be emitted by the Raft leader and measures the time since the leader was last able to contact the follower nodes when checking its leader lease. It can be used as a measure for how stable the Raft timing is and how close the leader is to timing out its lease.The lease timeout is 500 ms times the [`raft_multiplier` configuration](/docs/agent/options#raft_multiplier), so this telemetry value should not be getting close to that configured value, otherwise the Raft timing is marginal and might need to be tuned, or more powerful servers might be needed. See the [Server Performance](/docs/install/performance) guide for more details. | ms | timer |
| `consul.raft.rpc.processHeartBeat` | Measures the time taken to process a heartbeat request. | ms | timer |
| `consul.raft.rpc.requestVote` | Measures the time taken to process the request vote RPC call. | ms | timer |
| `consul.raft.snapshot.create` | Measures the time taken to initialize the snapshot process. | ms | timer |
| `consul.raft.snapshot.persist` | Measures the time taken to dump the current snapshot taken by the Consul agent to the disk. | ms | timer |
| `consul.raft.snapshot.takeSnapshot` | Measures the total time involved in taking the current snapshot (creating one and persisting it) by the Consul agent. | ms | timer |
| `consul.serf.snapshot.appendLine` | Measures the time taken by the Consul agent to append an entry into the existing log. | ms | timer |
| `consul.serf.snapshot.compact` | Measures the time taken by the Consul agent to compact a log. This operation occurs only when the snapshot becomes large enough to justify the compaction . | ms | timer |
| `consul.raft.state.candidate` | Increments whenever a Consul server starts an election. If this increments without a leadership change occurring it could indicate that a single server is overloaded or is experiencing network connectivity issues. | election attempts / interval | counter |
| `consul.raft.state.leader` | Increments whenever a Consul server becomes a leader. If there are frequent leadership changes this may be indication that the servers are overloaded and aren't meeting the soft real-time requirements for Raft, or that there are networking problems between the servers. | leadership transitions / interval | counter |
| `consul.raft.state.follower` | Counts the number of times an agent has entered the follower mode. This happens when a new agent joins the cluster or after the end of a leader election. | follower state entered / interval | counter |
| `consul.raft.transistion.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter |
| `consul.raft.verify_leader` | Counts the number of times an agent checks whether it is still the leader or not | checks / interval | Counter |
| `consul.rpc.accept_conn` | Increments when a server accepts an RPC connection. | connections | counter |
| `consul.catalog.register` | Measures the time it takes to complete a catalog register operation. | ms | timer |
| `consul.catalog.deregister` | Measures the time it takes to complete a catalog deregister operation. | ms | timer |