Bumps default Raft protocol to version 3. (#3477)

* Changes default Raft protocol to 3.

* Changes numPeers() to report only voters.

This should have been there before, but it's more obvious that this
is incorrect now that we default the Raft protocol to 3, which puts
new servers in a read-only state while Autopilot waits for them to
become healthy.

* Fixes TestLeader_RollRaftServer.

* Fixes TestOperator_RaftRemovePeerByAddress.

* Fixes TestServer_*.

Relaxed the check for a given number of voter peers and instead do
a thorough check that all servers see each other in their Raft
configurations.

* Fixes TestACL_*.

These now just check for Raft replication to be set up, and don't
care about the number of voter peers.

* Fixes TestOperator_Raft_ListPeers.

* Fixes TestAutopilot_CleanupDeadServerPeriodic.

* Fixes TestCatalog_ListNodes_ConsistentRead_Fail.

* Fixes TestLeader_ChangeServerID and adjusts the conn pool to throw away
sockets when it sees io.EOF.

* Changes version to 1.0.0 in the options doc.

* Makes metrics test more deterministic with autopilot metrics possible.
This commit is contained in:
James Phillips 2017-09-25 15:27:04 -07:00 committed by GitHub
parent 8bc8c25cfb
commit 45646ac3f4
14 changed files with 160 additions and 83 deletions

View File

@ -2,6 +2,8 @@
BREAKING CHANGES: BREAKING CHANGES:
* **Raft Protocol Defaults to 3:** The [`-raft-protocol`](https://www.consul.io/docs/agent/options.html#_raft_protocol) default has been changed from 2 to 3, enabling all Autopilot features by default. Version 3 requires Consul running 0.8.0 or newer on all servers in order to work, so if you are upgrading with older servers in a cluster then you will need to set this back to 2 in order to upgrade. See [Raft Protocol Version Compatibility](https://www.consul.io/docs/upgrade-specific.html#raft-protocol-version-compatibility) for more details. Also the format of `peers.json` used for outage recovery is different when running with the lastest Raft protocol. See [Manual Recovery Using peers.json](https://www.consul.io/docs/guides/outage.html#manual-recovery-using-peers-json) for a description of the required format.
FEATURES: FEATURES:
IMPROVEMENTS: IMPROVEMENTS:
@ -22,7 +24,7 @@ IMPROVEMENTS:
* agent: Switched to using a read lock for the agent's RPC dispatcher, which prevents RPC calls from getting serialized. [GH-3376] * agent: Switched to using a read lock for the agent's RPC dispatcher, which prevents RPC calls from getting serialized. [GH-3376]
* agent: When joining a cluster, Consul now skips the unique node ID constraint for Consul members running Consul older than 0.8.5. This makes it easier to upgrade to newer versions of Consul in an existing cluster with non-unique node IDs. [GH-3070] * agent: When joining a cluster, Consul now skips the unique node ID constraint for Consul members running Consul older than 0.8.5. This makes it easier to upgrade to newer versions of Consul in an existing cluster with non-unique node IDs. [GH-3070]
* build: Upgraded Go version to 1.9. [GH-3428] * build: Upgraded Go version to 1.9. [GH-3428]
* server: Consul servers can re-establish quorum after all of them change their IP addresses upon a restart. [GH-1580] * server: Consul servers can re-establish quorum after all of them change their IP addresses upon a restart. [GH-1580]
BUG FIXES: BUG FIXES:

View File

@ -235,7 +235,7 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
client := rpcClient(t, s1) client := rpcClient(t, s1)
defer client.Close() defer client.Close()
@ -278,7 +278,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -346,7 +346,7 @@ func TestACL_NonAuthority_Management(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -395,7 +395,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -461,7 +461,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -529,7 +529,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) }) retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -89,6 +89,7 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
c.Datacenter = "dc1" c.Datacenter = "dc1"
c.Bootstrap = false c.Bootstrap = false
} }
dir2, s2 := testServerWithConfig(t, conf) dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
@ -101,24 +102,35 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
defer os.RemoveAll(dir4) defer os.RemoveAll(dir4)
defer s4.Shutdown() defer s4.Shutdown()
servers := []*Server{s1, s2, s3, s4} dir5, s5 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir5)
defer s5.Shutdown()
// Join the servers to s1 servers := []*Server{s1, s2, s3, s4, s5}
// Join the servers to s1, and wait until they are all promoted to
// voters.
for _, s := range servers[1:] { for _, s := range servers[1:] {
joinLAN(t, s, s1) joinLAN(t, s, s1)
} }
retry.Run(t, func(r *retry.R) {
for _, s := range servers { r.Check(wantRaft(servers))
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 4)) }) for _, s := range servers {
} r.Check(wantPeers(s, 5))
}
})
// Kill a non-leader server // Kill a non-leader server
s4.Shutdown() s4.Shutdown()
// Should be removed from the peers automatically // Should be removed from the peers automatically
for _, s := range []*Server{s1, s2, s3} { servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) retry.Run(t, func(r *retry.R) {
} r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
} }
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) { func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {

View File

@ -779,29 +779,38 @@ func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false) dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join and wait for all servers to get promoted to voters.
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
joinLAN(t, s3, s2)
servers := []*Server{s1, s2, s3}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
testrpc.WaitForLeader(t, s1.RPC, "dc1") // Use the leader as the client, kill the followers.
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Use the leader as the client, kill the follower
var codec rpc.ClientCodec var codec rpc.ClientCodec
if s1.IsLeader() { for _, s := range servers {
codec = codec1 if s.IsLeader() {
s2.Shutdown() codec = rpcClient(t, s)
} else { defer codec.Close()
codec = codec2 } else {
s1.Shutdown() s.Shutdown()
}
}
if codec == nil {
t.Fatalf("no leader")
} }
args := structs.DCSpecificRequest{ args := structs.DCSpecificRequest{
@ -813,7 +822,6 @@ func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) {
if err == nil || !strings.HasPrefix(err.Error(), "leadership lost") { if err == nil || !strings.HasPrefix(err.Error(), "leadership lost") {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if out.QueryMeta.LastContact != 0 { if out.QueryMeta.LastContact != 0 {
t.Fatalf("should not have a last contact time") t.Fatalf("should not have a last contact time")
} }

View File

@ -434,10 +434,9 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
// TODO: default to 3 in Consul 0.9 // Raft protocol version 3 only works with other Consul servers running
// Use a transitional version of the raft protocol to interoperate with // 0.8.0 or later.
// versions 1 and 3 conf.RaftConfig.ProtocolVersion = 3
conf.RaftConfig.ProtocolVersion = 2
// Disable shutdown on removal // Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false conf.RaftConfig.ShutdownOnRemove = false

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -41,6 +42,42 @@ func wantPeers(s *Server, peers int) error {
return nil return nil
} }
// wantRaft determines if the servers have all of each other in their
// Raft configurations,
func wantRaft(servers []*Server) error {
// Make sure all the servers are represented in the Raft config,
// and that there are no extras.
verifyRaft := func(c raft.Configuration) error {
want := make(map[raft.ServerID]bool)
for _, s := range servers {
want[s.config.RaftConfig.LocalID] = true
}
for _, s := range c.Servers {
if !want[s.ID] {
return fmt.Errorf("don't want %q", s.ID)
}
delete(want, s.ID)
}
if len(want) > 0 {
return fmt.Errorf("didn't find %v", want)
}
return nil
}
for _, s := range servers {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
if err := verifyRaft(future.Configuration()); err != nil {
return err
}
}
return nil
}
// joinAddrLAN returns the address other servers can // joinAddrLAN returns the address other servers can
// use to join the cluster on the LAN interface. // use to join the cluster on the LAN interface.
func joinAddrLAN(s *Server) string { func joinAddrLAN(s *Server) string {

View File

@ -703,6 +703,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) { dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true c.Bootstrap = true
c.Datacenter = "dc1" c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -715,7 +716,11 @@ func TestLeader_RollRaftServer(t *testing.T) {
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false) dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer os.RemoveAll(dir3) defer os.RemoveAll(dir3)
defer s3.Shutdown() defer s3.Shutdown()
@ -803,10 +808,9 @@ func TestLeader_ChangeServerID(t *testing.T) {
servers := []*Server{s1, s2, s3} servers := []*Server{s1, s2, s3}
// Try to join // Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
joinLAN(t, s3, s1) joinLAN(t, s3, s1)
for _, s := range servers { for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
} }
@ -841,10 +845,21 @@ func TestLeader_ChangeServerID(t *testing.T) {
joinLAN(t, s4, s1) joinLAN(t, s4, s1)
servers[2] = s4 servers[2] = s4
// While integrating #3327 it uncovered that this test was flaky. The
// connection pool would use the same TCP connection to the old server
// which would give EOF errors to the autopilot health check RPC call.
// To make this more reliable we changed the connection pool to throw
// away the connection if it sees an EOF error, since there's no way
// that connection is going to work again. This made this test reliable
// since it will make a new connection to s4.
// Make sure the dead server is removed and we're back to 3 total peers // Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers { retry.Run(t, func(r *retry.R) {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) r.Check(wantRaft(servers))
} for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
} }
func TestLeader_ACL_Initialization(t *testing.T) { func TestLeader_ACL_Initialization(t *testing.T) {

View File

@ -6,6 +6,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -157,7 +158,8 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
// Add it manually to Raft. // Add it manually to Raft.
{ {
future := s1.raft.AddPeer(arg.Address) id := raft.ServerID("fake-node-id")
future := s1.raft.AddVoter(id, arg.Address, 0, time.Second)
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View File

@ -588,8 +588,6 @@ func (s *Server) setupRaft() error {
return err return err
} }
if !hasState { if !hasState {
// TODO (slackpad) - This will need to be updated when
// we add support for node IDs.
configuration := raft.Configuration{ configuration := raft.Configuration{
Servers: []raft.Server{ Servers: []raft.Server{
raft.Server{ raft.Server{
@ -835,15 +833,22 @@ func (s *Server) Leave() error {
return nil return nil
} }
// numPeers is used to check on the number of known peers, including the local // numPeers is used to check on the number of known peers, including potentially
// node. // the local node. We count only voters, since others can't actually become
// leader, so aren't considered peers.
func (s *Server) numPeers() (int, error) { func (s *Server) numPeers() (int, error) {
future := s.raft.GetConfiguration() future := s.raft.GetConfiguration()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
return 0, err return 0, err
} }
configuration := future.Configuration()
return len(configuration.Servers), nil var numPeers int
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers, nil
} }
// JoinLAN is used to have Consul join the inner-DC pool // JoinLAN is used to have Consul join the inner-DC pool

View File

@ -492,18 +492,10 @@ func TestServer_JoinLAN_TLS(t *testing.T) {
// Try to join // Try to join
joinLAN(t, s2, s1) joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
r.Fatalf("got %d s1 LAN members want %d", got, want)
}
if got, want := len(s2.LANMembers()), 2; got != want {
r.Fatalf("got %d s2 LAN members want %d", got, want)
}
})
// Verify Raft has established a peer // Verify Raft has established a peer
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 2)) r.Check(wantRaft([]*Server{s1, s2}))
r.Check(wantPeers(s2, 2))
}) })
} }
@ -555,10 +547,7 @@ func TestServer_Expect(t *testing.T) {
// Wait for the new server to see itself added to the cluster. // Wait for the new server to see itself added to the cluster.
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
r.Check(wantPeers(s1, 4)) r.Check(wantRaft([]*Server{s1, s2, s3, s4}))
r.Check(wantPeers(s2, 4))
r.Check(wantPeers(s3, 4))
r.Check(wantPeers(s4, 4))
}) })
// Make sure there's still a leader and that the term didn't change, // Make sure there's still a leader and that the term didn't change,
@ -661,16 +650,10 @@ func TestServer_Encrypted(t *testing.T) {
} }
func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) { func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
// Try to join joinLAN(t, s1, s2)
addr := fmt.Sprintf("127.0.0.1:%d", retry.Run(t, func(r *retry.R) {
s1.config.SerfLANConfig.MemberlistConfig.BindPort) r.Check(wantRaft([]*Server{s1, s2}))
if _, err := s2.JoinLAN([]string{addr}); err != nil { })
t.Fatalf("err: %v", err)
}
// make sure both servers know about each other
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s2, 2)) })
// Have s2 make an RPC call to s1 // Have s2 make an RPC call to s1
var leader *metadata.Server var leader *metadata.Server

View File

@ -413,6 +413,15 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply) err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil { if err != nil {
sc.Close() sc.Close()
// See the comment in leader_test.go TestLeader_ChangeServerID
// about how we found this. The tldr is that if we see this
// error, we know this connection is toast, so we should clear
// it and make a new one on the next attempt.
if err == io.EOF {
p.clearConn(conn)
}
p.releaseConn(conn) p.releaseConn(conn)
return fmt.Errorf("rpc error: %v", err) return fmt.Errorf("rpc error: %v", err)
} }

View File

@ -41,12 +41,15 @@ func TestAPI_AgentMetrics(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if len(metrics.Gauges) < 0 { var found bool
t.Fatalf("bad: %v", metrics) for _, g := range metrics.Gauges {
if g.Name == "consul.runtime.alloc_bytes" {
found = true
break
}
} }
if !found {
if metrics.Gauges[0].Name != "consul.runtime.alloc_bytes" { t.Fatalf("missing runtime metrics")
t.Fatalf("bad: %v", metrics.Gauges[0])
} }
} }

View File

@ -19,8 +19,8 @@ func TestOperator_Raft_ListPeers(t *testing.T) {
a := agent.NewTestAgent(t.Name(), ``) a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown() defer a.Shutdown()
expected := fmt.Sprintf("%s 127.0.0.1:%d 127.0.0.1:%d leader true 2", expected := fmt.Sprintf("%s %s 127.0.0.1:%d leader true 3",
a.Config.NodeName, a.Config.ServerPort, a.Config.ServerPort) a.Config.NodeName, a.Config.NodeID, a.Config.ServerPort)
// Test the legacy mode with 'consul operator raft -list-peers' // Test the legacy mode with 'consul operator raft -list-peers'
{ {

View File

@ -458,9 +458,11 @@ will exit with an error at startup.
You can view the protocol versions supported by Consul by running `consul -v`. You can view the protocol versions supported by Consul by running `consul -v`.
* <a name="_raft_protocol"></a><a href="#_raft_protocol">`-raft-protocol`</a> - This controls the internal * <a name="_raft_protocol"></a><a href="#_raft_protocol">`-raft-protocol`</a> - This controls the internal
version of the Raft consensus protocol used for server communications. This defaults to 2 but must version of the Raft consensus protocol used for server communications. This must be set to 3 in order to
be set to 3 in order to gain access to Autopilot features, with the exception of gain access to Autopilot features, with the exception of [`cleanup_dead_servers`](#cleanup_dead_servers).
[`cleanup_dead_servers`](#cleanup_dead_servers). Defaults to 3 in Consul 1.0.0 and later (defaulted to 2 previously). See
[Raft Protocol Version Compatibility](/docs/upgrade-specific.html#raft-protocol-version-compatibility)
for more details.
* <a name="_recursor"></a><a href="#_recursor">`-recursor`</a> - Specifies the address of an upstream DNS * <a name="_recursor"></a><a href="#_recursor">`-recursor`</a> - Specifies the address of an upstream DNS
server. This option may be provided multiple times, and is functionally server. This option may be provided multiple times, and is functionally