diff --git a/agent/agent.go b/agent/agent.go
index c63477670a..2d232fe551 100644
--- a/agent/agent.go
+++ b/agent/agent.go
@@ -750,6 +750,14 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCMaxBurst = a.config.RPCMaxBurst
}
+ // RPC-related performance configs.
+ if a.config.RPCHoldTimeout > 0 {
+ base.RPCHoldTimeout = a.config.RPCHoldTimeout
+ }
+ if a.config.LeaveDrainTime > 0 {
+ base.LeaveDrainTime = a.config.LeaveDrainTime
+ }
+
// set the src address for outgoing rpc connections
// Use port 0 so that outgoing connections use a random port.
if !ipaddr.IsAny(base.RPCAddr.IP) {
diff --git a/agent/config/builder.go b/agent/config/builder.go
index ac6850c177..e665b53b53 100644
--- a/agent/config/builder.go
+++ b/agent/config/builder.go
@@ -587,6 +587,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
KeyFile: b.stringVal(c.KeyFile),
+ LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
LeaveOnTerm: leaveOnTerm,
LogLevel: b.stringVal(c.LogLevel),
NodeID: types.NodeID(b.stringVal(c.NodeID)),
@@ -596,6 +597,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
PidFile: b.stringVal(c.PidFile),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
+ RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
RPCProtocol: b.intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
diff --git a/agent/config/config.go b/agent/config/config.go
index 19cd8ac84d..1f18e0931d 100644
--- a/agent/config/config.go
+++ b/agent/config/config.go
@@ -344,7 +344,9 @@ type HTTPConfig struct {
}
type Performance struct {
- RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
+ LeaveDrainTime *string `json:"leave_drain_time,omitempty" hcl:"leave_drain_time" mapstructure:"leave_drain_time"`
+ RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
+ RPCHoldTimeout *string `json:"rpc_hold_timeout" hcl:"rpc_hold_timeout" mapstructure:"rpc_hold_timeout"`
}
type Telemetry struct {
diff --git a/agent/config/default.go b/agent/config/default.go
index 765232bbfc..6bc3295c42 100644
--- a/agent/config/default.go
+++ b/agent/config/default.go
@@ -65,7 +65,9 @@ func DefaultSource() Source {
rpc_max_burst = 1000
}
performance = {
+ leave_drain_time = "5s"
raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + `
+ rpc_hold_timeout = "7s"
}
ports = {
dns = 8600
diff --git a/agent/config/runtime.go b/agent/config/runtime.go
index ebbf88fe70..5fc2ab3618 100644
--- a/agent/config/runtime.go
+++ b/agent/config/runtime.go
@@ -146,6 +146,7 @@ type RuntimeConfig struct {
HTTPSAddrs []net.Addr
HTTPSPort int
KeyFile string
+ LeaveDrainTime time.Duration
LeaveOnTerm bool
LogLevel string
NodeID types.NodeID
@@ -154,6 +155,7 @@ type RuntimeConfig struct {
PidFile string
RPCAdvertiseAddr *net.TCPAddr
RPCBindAddr *net.TCPAddr
+ RPCHoldTimeout time.Duration
RPCMaxBurst int
RPCProtocol int
RPCRateLimit rate.Limit
diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go
index ee6df1c497..c5699f052b 100644
--- a/agent/config/runtime_test.go
+++ b/agent/config/runtime_test.go
@@ -2104,7 +2104,9 @@ func TestFullConfig(t *testing.T) {
"node_name": "otlLxGaI",
"non_voting_server": true,
"performance": {
- "raft_multiplier": 5
+ "leave_drain_time": "8265s",
+ "raft_multiplier": 5,
+ "rpc_hold_timeout": "15707s"
},
"pid_file": "43xN80Km",
"ports": {
@@ -2535,7 +2537,9 @@ func TestFullConfig(t *testing.T) {
node_name = "otlLxGaI"
non_voting_server = true
performance {
+ leave_drain_time = "8265s"
raft_multiplier = 5
+ rpc_hold_timeout = "15707s"
}
pid_file = "43xN80Km"
ports {
@@ -3088,6 +3092,7 @@ func TestFullConfig(t *testing.T) {
HTTPSAddrs: []net.Addr{tcpAddr("95.17.17.19:15127")},
HTTPSPort: 15127,
KeyFile: "IEkkwgIA",
+ LeaveDrainTime: 8265 * time.Second,
LeaveOnTerm: true,
LogLevel: "k1zo9Spt",
NodeID: types.NodeID("AsUIlw99"),
@@ -3097,6 +3102,7 @@ func TestFullConfig(t *testing.T) {
PidFile: "43xN80Km",
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
+ RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,
RPCMaxBurst: 44848,
@@ -3765,6 +3771,7 @@ func TestSanitize(t *testing.T) {
"HTTPSAddrs": [],
"HTTPSPort": 0,
"KeyFile": "hidden",
+ "LeaveDrainTime": "0s",
"LeaveOnTerm": false,
"LogLevel": "",
"NodeID": "",
@@ -3774,6 +3781,7 @@ func TestSanitize(t *testing.T) {
"PidFile": "",
"RPCAdvertiseAddr": "",
"RPCBindAddr": "",
+ "RPCHoldTimeout": "0s",
"RPCMaxBurst": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
diff --git a/agent/consul/client.go b/agent/consul/client.go
index b8258bc2f5..2e87b9ce36 100644
--- a/agent/consul/client.go
+++ b/agent/consul/client.go
@@ -233,6 +233,15 @@ func (c *Client) Encrypted() bool {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
+ // This is subtle but we start measuring the time on the client side
+ // right at the time of the first request, vs. on the first retry as
+ // is done on the server side inside forward(). This is because the
+ // servers may already be applying the RPCHoldTimeout up there, so by
+ // starting the timer here we won't potentially double up the delay.
+ // TODO (slackpad) Plumb a deadline here with a context.
+ firstCheck := time.Now()
+
+TRY:
server := c.routers.FindServer()
if server == nil {
return structs.ErrNoServers
@@ -248,13 +257,28 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
}
// Make the request.
- if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
- c.routers.NotifyFailedServer(server)
- c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
- return err
+ rpcErr := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply)
+ if rpcErr == nil {
+ return nil
}
- return nil
+ // Move off to another server, and see if we can retry.
+ c.logger.Printf("[ERR] consul: %q RPC failed to server %s: %v", method, server.Addr, rpcErr)
+ c.routers.NotifyFailedServer(server)
+ if retry := canRetry(args, rpcErr); !retry {
+ return rpcErr
+ }
+
+ // We can wait a bit and retry!
+ if time.Now().Sub(firstCheck) < c.config.RPCHoldTimeout {
+ jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction)
+ select {
+ case <-time.After(jitter):
+ goto TRY
+ case <-c.shutdownCh:
+ }
+ }
+ return rpcErr
}
// SnapshotRPC sends the snapshot request to one of the servers, reading from
diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go
index 684ced39ef..1958ba9fb0 100644
--- a/agent/consul/client_test.go
+++ b/agent/consul/client_test.go
@@ -180,6 +180,75 @@ func TestClient_RPC(t *testing.T) {
})
}
+type leaderFailer struct {
+ totalCalls int
+ onceCalls int
+}
+
+func (l *leaderFailer) Always(args struct{}, reply *struct{}) error {
+ l.totalCalls++
+ return structs.ErrNoLeader
+}
+
+func (l *leaderFailer) Once(args struct{}, reply *struct{}) error {
+ l.totalCalls++
+ l.onceCalls++
+
+ switch {
+ case l.onceCalls == 1:
+ return structs.ErrNoLeader
+
+ default:
+ return nil
+ }
+}
+
+func TestClient_RPC_Retry(t *testing.T) {
+ t.Parallel()
+
+ dir1, s1 := testServer(t)
+ defer os.RemoveAll(dir1)
+ defer s1.Shutdown()
+
+ dir2, c1 := testClientWithConfig(t, func(c *Config) {
+ c.Datacenter = "dc1"
+ c.NodeName = uniqueNodeName(t.Name())
+ c.RPCHoldTimeout = 2 * time.Second
+ })
+ defer os.RemoveAll(dir2)
+ defer c1.Shutdown()
+
+ joinLAN(t, c1, s1)
+ retry.Run(t, func(r *retry.R) {
+ var out struct{}
+ if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
+ r.Fatalf("err: %v", err)
+ }
+ })
+
+ failer := &leaderFailer{}
+ if err := s1.RegisterEndpoint("Fail", failer); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ var out struct{}
+ if err := c1.RPC("Fail.Always", struct{}{}, &out); !structs.IsErrNoLeader(err) {
+ t.Fatalf("err: %v", err)
+ }
+ if got, want := failer.totalCalls, 2; got < want {
+ t.Fatalf("got %d want >= %d", got, want)
+ }
+ if err := c1.RPC("Fail.Once", struct{}{}, &out); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if got, want := failer.onceCalls, 2; got < want {
+ t.Fatalf("got %d want >= %d", got, want)
+ }
+ if got, want := failer.totalCalls, 4; got < want {
+ t.Fatalf("got %d want >= %d", got, want)
+ }
+}
+
func TestClient_RPC_Pool(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
diff --git a/agent/consul/config.go b/agent/consul/config.go
index 1b904593e1..c37dd8d67d 100644
--- a/agent/consul/config.go
+++ b/agent/consul/config.go
@@ -329,6 +329,10 @@ type Config struct {
RPCRate rate.Limit
RPCMaxBurst int
+ // LeaveDrainTime is used to wait after a server has left the LAN Serf
+ // pool for RPCs to drain and new requests to be sent to other servers.
+ LeaveDrainTime time.Duration
+
// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
@@ -406,12 +410,6 @@ func DefaultConfig() *Config {
CoordinateUpdateBatchSize: 128,
CoordinateUpdateMaxBatches: 5,
- // This holds RPCs during leader elections. For the default Raft
- // config the election timeout is 5 seconds, so we set this a
- // bit longer to try to cover that period. This should be more
- // than enough when running in the high performance mode.
- RPCHoldTimeout: 7 * time.Second,
-
RPCRate: rate.Inf,
RPCMaxBurst: 1000,
diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go
index e083258912..9feed0199a 100644
--- a/agent/consul/rpc.go
+++ b/agent/consul/rpc.go
@@ -177,6 +177,24 @@ func (s *Server) handleSnapshotConn(conn net.Conn) {
}()
}
+// canRetry returns true if the given situation is safe for a retry.
+func canRetry(args interface{}, err error) bool {
+ // No leader errors are always safe to retry since no state could have
+ // been changed.
+ if structs.IsErrNoLeader(err) {
+ return true
+ }
+
+ // Reads are safe to retry for stream errors, such as if a server was
+ // being shut down.
+ info, ok := args.(structs.RPCInfo)
+ if ok && info.IsRead() && lib.IsErrEOF(err) {
+ return true
+ }
+
+ return false
+}
+
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
@@ -195,8 +213,15 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
}
CHECK_LEADER:
+ // Fail fast if we are in the process of leaving
+ select {
+ case <-s.leaveCh:
+ return true, structs.ErrNoLeader
+ default:
+ }
+
// Find the leader
- isLeader, remoteServer := s.getLeader()
+ isLeader, leader := s.getLeader()
// Handle the case we are the leader
if isLeader {
@@ -204,11 +229,17 @@ CHECK_LEADER:
}
// Handle the case of a known leader
- if remoteServer != nil {
- err := s.forwardLeader(remoteServer, method, args, reply)
- return true, err
+ rpcErr := structs.ErrNoLeader
+ if leader != nil {
+ rpcErr = s.connPool.RPC(s.config.Datacenter, leader.Addr,
+ leader.Version, method, leader.UseTLS, args, reply)
+ if rpcErr != nil && canRetry(info, rpcErr) {
+ goto RETRY
+ }
+ return true, rpcErr
}
+RETRY:
// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
@@ -218,12 +249,13 @@ CHECK_LEADER:
select {
case <-time.After(jitter):
goto CHECK_LEADER
+ case <-s.leaveCh:
case <-s.shutdownCh:
}
}
// No leader found and hold time exceeded
- return true, structs.ErrNoLeader
+ return true, rpcErr
}
// getLeader returns if the current node is the leader, and if not then it
@@ -248,15 +280,6 @@ func (s *Server) getLeader() (bool, *metadata.Server) {
return false, server
}
-// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
-func (s *Server) forwardLeader(server *metadata.Server, method string, args interface{}, reply interface{}) error {
- // Handle a missing server
- if server == nil {
- return structs.ErrNoLeader
- }
- return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply)
-}
-
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
manager, server, ok := s.router.FindRoute(dc)
diff --git a/agent/consul/server.go b/agent/consul/server.go
index ac85326dfe..09c13a8e7a 100644
--- a/agent/consul/server.go
+++ b/agent/consul/server.go
@@ -148,9 +148,16 @@ type Server struct {
// updated
reconcileCh chan serf.Member
- // used to track when the server is ready to serve consistent reads, updated atomically
+ // readyForConsistentReads is used to track when the leader server is
+ // ready to serve consistent reads, after it has applied its initial
+ // barrier. This is updated atomically.
readyForConsistentReads int32
+ // leaveCh is used to signal that the server is leaving the cluster
+ // and trying to shed its RPC traffic onto other Consul servers. This
+ // is only ever closed.
+ leaveCh chan struct{}
+
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *router.Router
@@ -302,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
+ leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, 32),
router: router.NewRouter(logger, config.Datacenter),
rpcServer: rpc.NewServer(),
@@ -783,6 +791,14 @@ func (s *Server) Leave() error {
}
}
+ // Start refusing RPCs now that we've left the LAN pool. It's important
+ // to do this *after* we've left the LAN pool so that clients will know
+ // to shift onto another server if they perform a retry. We also wake up
+ // all queries in the RPC retry state.
+ s.logger.Printf("[INFO] consul: Waiting %s to drain RPC traffic", s.config.LeaveDrainTime)
+ close(s.leaveCh)
+ time.Sleep(s.config.LeaveDrainTime)
+
// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go
index 39b9644e8f..be347fe56c 100644
--- a/agent/consul/server_test.go
+++ b/agent/consul/server_test.go
@@ -93,6 +93,12 @@ func testServerConfig(t *testing.T) (string, *Config) {
config.Build = "0.8.0"
config.CoordinateUpdatePeriod = 100 * time.Millisecond
+ config.LeaveDrainTime = 1 * time.Millisecond
+
+ // TODO (slackpad) - We should be able to run all tests w/o this, but it
+ // looks like several depend on it.
+ config.RPCHoldTimeout = 5 * time.Second
+
return dir, config
}
@@ -395,16 +401,16 @@ func TestServer_LeaveLeader(t *testing.T) {
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Issue a leave to the leader
- var err error
+ var leader *Server
switch {
case s1.IsLeader():
- err = s1.Leave()
+ leader = s1
case s2.IsLeader():
- err = s2.Leave()
+ leader = s2
default:
t.Fatal("no leader")
}
- if err != nil {
+ if err := leader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}
@@ -433,16 +439,16 @@ func TestServer_Leave(t *testing.T) {
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Issue a leave to the non-leader
- var err error
+ var nonleader *Server
switch {
case s1.IsLeader():
- err = s2.Leave()
+ nonleader = s2
case s2.IsLeader():
- err = s1.Leave()
+ nonleader = s1
default:
t.Fatal("no leader")
}
- if err != nil {
+ if err := nonleader.Leave(); err != nil {
t.Fatal("leave failed: ", err)
}
diff --git a/agent/pool/pool.go b/agent/pool/pool.go
index 7fb55baebc..cdf6c6ec69 100644
--- a/agent/pool/pool.go
+++ b/agent/pool/pool.go
@@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
+ "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
@@ -406,7 +407,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
// Get a usable client
conn, sc, err := p.getClient(dc, addr, version, useTLS)
if err != nil {
- return fmt.Errorf("rpc error: %v", err)
+ return fmt.Errorf("rpc error getting client: %v", err)
}
// Make the RPC call
@@ -418,12 +419,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
// about how we found this. The tldr is that if we see this
// error, we know this connection is toast, so we should clear
// it and make a new one on the next attempt.
- if err == io.EOF {
+ if lib.IsErrEOF(err) {
p.clearConn(conn)
}
p.releaseConn(conn)
- return fmt.Errorf("rpc error: %v", err)
+ return fmt.Errorf("rpc error making call: %v", err)
}
// Done with the connection
diff --git a/agent/structs/errors.go b/agent/structs/errors.go
index fcf6dafe92..66337d2e47 100644
--- a/agent/structs/errors.go
+++ b/agent/structs/errors.go
@@ -23,6 +23,10 @@ var (
ErrRPCRateExceeded = errors.New(errRPCRateExceeded)
)
-func IsErrRPCRateExceeded(err error) bool {
- return strings.Contains(err.Error(), errRPCRateExceeded)
+func IsErrNoLeader(err error) bool {
+ return err != nil && strings.Contains(err.Error(), errNoLeader)
+}
+
+func IsErrRPCRateExceeded(err error) bool {
+ return err != nil && strings.Contains(err.Error(), errRPCRateExceeded)
}
diff --git a/lib/eof.go b/lib/eof.go
new file mode 100644
index 0000000000..f77844fd64
--- /dev/null
+++ b/lib/eof.go
@@ -0,0 +1,27 @@
+package lib
+
+import (
+ "io"
+ "strings"
+
+ "github.com/hashicorp/yamux"
+)
+
+var yamuxStreamClosed = yamux.ErrStreamClosed.Error()
+var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error()
+
+// IsErrEOF returns true if we get an EOF error from the socket itself, or
+// an EOF equivalent error from yamux.
+func IsErrEOF(err error) bool {
+ if err == io.EOF {
+ return true
+ }
+
+ errStr := err.Error()
+ if strings.Contains(errStr, yamuxStreamClosed) ||
+ strings.Contains(errStr, yamuxSessionShutdown) {
+ return true
+ }
+
+ return false
+}
diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md
index fb4ea5ab9f..7eeaae208c 100644
--- a/website/source/docs/agent/options.html.md
+++ b/website/source/docs/agent/options.html.md
@@ -958,6 +958,12 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
Consul. See the [Server Performance](/docs/guides/performance.html) guide for more details. The
following parameters are available:
+ * `leave_drain_time` - A duration
+ that a server will dwell during a graceful leave in order to allow requests to be retried against
+ other Consul servers. Under normal circumstances, this can prevent clients from experiencing
+ "no leader" errors when performing a rolling update of the Consul servers. This was added in
+ Consul 1.0. Must be a duration value such as 10s. Defaults to 5s.
+
* `raft_multiplier` - An integer
multiplier used by Consul servers to scale key Raft timing parameters. Omitting this value
or setting it to 0 uses default timing described below. Lower values are used to tighten
@@ -975,6 +981,11 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
See the note on [last contact](/docs/guides/performance.html#last-contact) timing for more
details on tuning this parameter. The maximum allowed value is 10.
+ * `rpc_hold_timeout` - A duration
+ that a client or server will retry internal RPC requests during leader elections. Under normal
+ circumstances, this can prevent clients from experiencing "no leader" errors. This was added in
+ Consul 1.0. Must be a duration value such as 10s. Defaults to 7s.
+
* `ports` This is a nested object that allows setting
the bind ports for the following keys:
* `dns` - The DNS server, -1 to disable. Default 8600.