Makes RPC handling more robust when rolling servers. (#3561)

* Adds client-side retry for no leader errors.

This paves over the case where the client was connected to the leader
when it loses leadership.

* Adds a configurable server RPC drain time and a fail-fast path for RPCs.

When a server leaves it gets removed from the Raft configuration, so it will
never know who the new leader server ends up being. Without this we'd be
doomed to wait out the RPC hold timeout and then fail. This makes things fail
a little quicker while a sever is draining, and since we added a client retry
AND since the server doing this has already shut down and left the Serf LAN,
clients should retry against some other server.

* Makes the RPC hold timeout configurable.

* Reorders struct members.

* Sets the RPC hold timeout default for test servers.

* Bumps the leave drain time up to 5 seconds.

* Robustifies retries with a simpler client-side RPC hold.

* Reverts untended delete.
This commit is contained in:
James Phillips 2017-10-10 15:19:50 -07:00 committed by GitHub
parent d8a1ec70f8
commit bb12368eac
16 changed files with 244 additions and 41 deletions

View File

@ -750,6 +750,14 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCMaxBurst = a.config.RPCMaxBurst
}
// RPC-related performance configs.
if a.config.RPCHoldTimeout > 0 {
base.RPCHoldTimeout = a.config.RPCHoldTimeout
}
if a.config.LeaveDrainTime > 0 {
base.LeaveDrainTime = a.config.LeaveDrainTime
}
// set the src address for outgoing rpc connections
// Use port 0 so that outgoing connections use a random port.
if !ipaddr.IsAny(base.RPCAddr.IP) {

View File

@ -587,6 +587,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
KeyFile: b.stringVal(c.KeyFile),
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
LeaveOnTerm: leaveOnTerm,
LogLevel: b.stringVal(c.LogLevel),
NodeID: types.NodeID(b.stringVal(c.NodeID)),
@ -596,6 +597,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
PidFile: b.stringVal(c.PidFile),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
RPCProtocol: b.intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),

View File

@ -344,7 +344,9 @@ type HTTPConfig struct {
}
type Performance struct {
RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
LeaveDrainTime *string `json:"leave_drain_time,omitempty" hcl:"leave_drain_time" mapstructure:"leave_drain_time"`
RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
RPCHoldTimeout *string `json:"rpc_hold_timeout" hcl:"rpc_hold_timeout" mapstructure:"rpc_hold_timeout"`
}
type Telemetry struct {

View File

@ -65,7 +65,9 @@ func DefaultSource() Source {
rpc_max_burst = 1000
}
performance = {
leave_drain_time = "5s"
raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + `
rpc_hold_timeout = "7s"
}
ports = {
dns = 8600

View File

@ -146,6 +146,7 @@ type RuntimeConfig struct {
HTTPSAddrs []net.Addr
HTTPSPort int
KeyFile string
LeaveDrainTime time.Duration
LeaveOnTerm bool
LogLevel string
NodeID types.NodeID
@ -154,6 +155,7 @@ type RuntimeConfig struct {
PidFile string
RPCAdvertiseAddr *net.TCPAddr
RPCBindAddr *net.TCPAddr
RPCHoldTimeout time.Duration
RPCMaxBurst int
RPCProtocol int
RPCRateLimit rate.Limit

View File

@ -2104,7 +2104,9 @@ func TestFullConfig(t *testing.T) {
"node_name": "otlLxGaI",
"non_voting_server": true,
"performance": {
"raft_multiplier": 5
"leave_drain_time": "8265s",
"raft_multiplier": 5,
"rpc_hold_timeout": "15707s"
},
"pid_file": "43xN80Km",
"ports": {
@ -2535,7 +2537,9 @@ func TestFullConfig(t *testing.T) {
node_name = "otlLxGaI"
non_voting_server = true
performance {
leave_drain_time = "8265s"
raft_multiplier = 5
rpc_hold_timeout = "15707s"
}
pid_file = "43xN80Km"
ports {
@ -3088,6 +3092,7 @@ func TestFullConfig(t *testing.T) {
HTTPSAddrs: []net.Addr{tcpAddr("95.17.17.19:15127")},
HTTPSPort: 15127,
KeyFile: "IEkkwgIA",
LeaveDrainTime: 8265 * time.Second,
LeaveOnTerm: true,
LogLevel: "k1zo9Spt",
NodeID: types.NodeID("AsUIlw99"),
@ -3097,6 +3102,7 @@ func TestFullConfig(t *testing.T) {
PidFile: "43xN80Km",
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,
RPCMaxBurst: 44848,
@ -3765,6 +3771,7 @@ func TestSanitize(t *testing.T) {
"HTTPSAddrs": [],
"HTTPSPort": 0,
"KeyFile": "hidden",
"LeaveDrainTime": "0s",
"LeaveOnTerm": false,
"LogLevel": "",
"NodeID": "",
@ -3774,6 +3781,7 @@ func TestSanitize(t *testing.T) {
"PidFile": "",
"RPCAdvertiseAddr": "",
"RPCBindAddr": "",
"RPCHoldTimeout": "0s",
"RPCMaxBurst": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,

View File

@ -233,6 +233,15 @@ func (c *Client) Encrypted() bool {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
// TODO (slackpad) Plumb a deadline here with a context.
firstCheck := time.Now()
TRY:
server := c.routers.FindServer()
if server == nil {
return structs.ErrNoServers
@ -248,13 +257,28 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
}
// Make the request.
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
c.routers.NotifyFailedServer(server)
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
return err
rpcErr := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply)
if rpcErr == nil {
return nil
}
return nil
// Move off to another server, and see if we can retry.
c.logger.Printf("[ERR] consul: %q RPC failed to server %s: %v", method, server.Addr, rpcErr)
c.routers.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr); !retry {
return rpcErr
}
// We can wait a bit and retry!
if time.Now().Sub(firstCheck) < c.config.RPCHoldTimeout {
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
}
return rpcErr
}
// SnapshotRPC sends the snapshot request to one of the servers, reading from

View File

@ -180,6 +180,75 @@ func TestClient_RPC(t *testing.T) {
})
}
type leaderFailer struct {
totalCalls int
onceCalls int
}
func (l *leaderFailer) Always(args struct{}, reply *struct{}) error {
l.totalCalls++
return structs.ErrNoLeader
}
func (l *leaderFailer) Once(args struct{}, reply *struct{}) error {
l.totalCalls++
l.onceCalls++
switch {
case l.onceCalls == 1:
return structs.ErrNoLeader
default:
return nil
}
}
func TestClient_RPC_Retry(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 2 * time.Second
})
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) {
var out struct{}
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
r.Fatalf("err: %v", err)
}
})
failer := &leaderFailer{}
if err := s1.RegisterEndpoint("Fail", failer); err != nil {
t.Fatalf("err: %v", err)
}
var out struct{}
if err := c1.RPC("Fail.Always", struct{}{}, &out); !structs.IsErrNoLeader(err) {
t.Fatalf("err: %v", err)
}
if got, want := failer.totalCalls, 2; got < want {
t.Fatalf("got %d want >= %d", got, want)
}
if err := c1.RPC("Fail.Once", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
if got, want := failer.onceCalls, 2; got < want {
t.Fatalf("got %d want >= %d", got, want)
}
if got, want := failer.totalCalls, 4; got < want {
t.Fatalf("got %d want >= %d", got, want)
}
}
func TestClient_RPC_Pool(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)

View File

@ -329,6 +329,10 @@ type Config struct {
RPCRate rate.Limit
RPCMaxBurst int
// LeaveDrainTime is used to wait after a server has left the LAN Serf
// pool for RPCs to drain and new requests to be sent to other servers.
LeaveDrainTime time.Duration
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
@ -406,12 +410,6 @@ func DefaultConfig() *Config {
CoordinateUpdateBatchSize: 128,
CoordinateUpdateMaxBatches: 5,
// This holds RPCs during leader elections. For the default Raft
// config the election timeout is 5 seconds, so we set this a
// bit longer to try to cover that period. This should be more
// than enough when running in the high performance mode.
RPCHoldTimeout: 7 * time.Second,
RPCRate: rate.Inf,
RPCMaxBurst: 1000,

View File

@ -177,6 +177,24 @@ func (s *Server) handleSnapshotConn(conn net.Conn) {
}()
}
// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error) bool {
// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
return true
}
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
return true
}
return false
}
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
@ -195,8 +213,15 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
}
CHECK_LEADER:
// Fail fast if we are in the process of leaving
select {
case <-s.leaveCh:
return true, structs.ErrNoLeader
default:
}
// Find the leader
isLeader, remoteServer := s.getLeader()
isLeader, leader := s.getLeader()
// Handle the case we are the leader
if isLeader {
@ -204,11 +229,17 @@ CHECK_LEADER:
}
// Handle the case of a known leader
if remoteServer != nil {
err := s.forwardLeader(remoteServer, method, args, reply)
return true, err
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.Addr,
leader.Version, method, leader.UseTLS, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
}
return true, rpcErr
}
RETRY:
// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
@ -218,12 +249,13 @@ CHECK_LEADER:
select {
case <-time.After(jitter):
goto CHECK_LEADER
case <-s.leaveCh:
case <-s.shutdownCh:
}
}
// No leader found and hold time exceeded
return true, structs.ErrNoLeader
return true, rpcErr
}
// getLeader returns if the current node is the leader, and if not then it
@ -248,15 +280,6 @@ func (s *Server) getLeader() (bool, *metadata.Server) {
return false, server
}
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(server *metadata.Server, method string, args interface{}, reply interface{}) error {
// Handle a missing server
if server == nil {
return structs.ErrNoLeader
}
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply)
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
manager, server, ok := s.router.FindRoute(dc)

View File

@ -148,9 +148,16 @@ type Server struct {
// updated
reconcileCh chan serf.Member
// used to track when the server is ready to serve consistent reads, updated atomically
// readyForConsistentReads is used to track when the leader server is
// ready to serve consistent reads, after it has applied its initial
// barrier. This is updated atomically.
readyForConsistentReads int32
// leaveCh is used to signal that the server is leaving the cluster
// and trying to shed its RPC traffic onto other Consul servers. This
// is only ever closed.
leaveCh chan struct{}
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *router.Router
@ -302,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, 32),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
@ -783,6 +791,14 @@ func (s *Server) Leave() error {
}
}
// Start refusing RPCs now that we've left the LAN pool. It's important
// to do this *after* we've left the LAN pool so that clients will know
// to shift onto another server if they perform a retry. We also wake up
// all queries in the RPC retry state.
s.logger.Printf("[INFO] consul: Waiting %s to drain RPC traffic", s.config.LeaveDrainTime)
close(s.leaveCh)
time.Sleep(s.config.LeaveDrainTime)
// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.

View File

@ -93,6 +93,12 @@ func testServerConfig(t *testing.T) (string, *Config) {
config.Build = "0.8.0"
config.CoordinateUpdatePeriod = 100 * time.Millisecond
config.LeaveDrainTime = 1 * time.Millisecond
// TODO (slackpad) - We should be able to run all tests w/o this, but it
// looks like several depend on it.
config.RPCHoldTimeout = 5 * time.Second
return dir, config
}
@ -395,16 +401,16 @@ func TestServer_LeaveLeader(t *testing.T) {
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Issue a leave to the leader
var err error
var leader *Server
switch {
case s1.IsLeader():
err = s1.Leave()
leader = s1
case s2.IsLeader():
err = s2.Leave()
leader = s2
default:
t.Fatal("no leader")
}
if err != nil {
if err := leader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}
@ -433,16 +439,16 @@ func TestServer_Leave(t *testing.T) {
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Issue a leave to the non-leader
var err error
var nonleader *Server
switch {
case s1.IsLeader():
err = s2.Leave()
nonleader = s2
case s2.IsLeader():
err = s1.Leave()
nonleader = s1
default:
t.Fatal("no leader")
}
if err != nil {
if err := nonleader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}

View File

@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
@ -406,7 +407,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
// Get a usable client
conn, sc, err := p.getClient(dc, addr, version, useTLS)
if err != nil {
return fmt.Errorf("rpc error: %v", err)
return fmt.Errorf("rpc error getting client: %v", err)
}
// Make the RPC call
@ -418,12 +419,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
// about how we found this. The tldr is that if we see this
// error, we know this connection is toast, so we should clear
// it and make a new one on the next attempt.
if err == io.EOF {
if lib.IsErrEOF(err) {
p.clearConn(conn)
}
p.releaseConn(conn)
return fmt.Errorf("rpc error: %v", err)
return fmt.Errorf("rpc error making call: %v", err)
}
// Done with the connection

View File

@ -23,6 +23,10 @@ var (
ErrRPCRateExceeded = errors.New(errRPCRateExceeded)
)
func IsErrRPCRateExceeded(err error) bool {
return strings.Contains(err.Error(), errRPCRateExceeded)
func IsErrNoLeader(err error) bool {
return err != nil && strings.Contains(err.Error(), errNoLeader)
}
func IsErrRPCRateExceeded(err error) bool {
return err != nil && strings.Contains(err.Error(), errRPCRateExceeded)
}

27
lib/eof.go Normal file
View File

@ -0,0 +1,27 @@
package lib
import (
"io"
"strings"
"github.com/hashicorp/yamux"
)
var yamuxStreamClosed = yamux.ErrStreamClosed.Error()
var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error()
// IsErrEOF returns true if we get an EOF error from the socket itself, or
// an EOF equivalent error from yamux.
func IsErrEOF(err error) bool {
if err == io.EOF {
return true
}
errStr := err.Error()
if strings.Contains(errStr, yamuxStreamClosed) ||
strings.Contains(errStr, yamuxSessionShutdown) {
return true
}
return false
}

View File

@ -958,6 +958,12 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
Consul. See the [Server Performance](/docs/guides/performance.html) guide for more details. The
following parameters are available:
* <a name="leave_drain_time"></a><a href="#leave_drain_time">`leave_drain_time`</a> - A duration
that a server will dwell during a graceful leave in order to allow requests to be retried against
other Consul servers. Under normal circumstances, this can prevent clients from experiencing
"no leader" errors when performing a rolling update of the Consul servers. This was added in
Consul 1.0. Must be a duration value such as 10s. Defaults to 5s.
* <a name="raft_multiplier"></a><a href="#raft_multiplier">`raft_multiplier`</a> - An integer
multiplier used by Consul servers to scale key Raft timing parameters. Omitting this value
or setting it to 0 uses default timing described below. Lower values are used to tighten
@ -975,6 +981,11 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
See the note on [last contact](/docs/guides/performance.html#last-contact) timing for more
details on tuning this parameter. The maximum allowed value is 10.
* <a name="rpc_hold_timeout"></a><a href="#rpc_hold_timeout">`rpc_hold_timeout`</a> - A duration
that a client or server will retry internal RPC requests during leader elections. Under normal
circumstances, this can prevent clients from experiencing "no leader" errors. This was added in
Consul 1.0. Must be a duration value such as 10s. Defaults to 7s.
* <a name="ports"></a><a href="#ports">`ports`</a> This is a nested object that allows setting
the bind ports for the following keys:
* <a name="dns_port"></a><a href="#dns_port">`dns`</a> - The DNS server, -1 to disable. Default 8600.