From be7893082ccfc48d1ae985d1a5ae71c3605487d5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 10 Jul 2016 13:24:06 -0400 Subject: [PATCH] consul: Refactor forward to hold RPC when no leader is known --- consul/rpc.go | 57 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/consul/rpc.go b/consul/rpc.go index 6105e3ae55..fc040b22cc 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -10,6 +10,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" @@ -39,7 +40,8 @@ const ( // jitterFraction is a the limit to the amount of jitter we apply // to a user specified MaxQueryTime. We divide the specified time by - // the fraction. So 16 == 6.25% limit of jitter + // the fraction. So 16 == 6.25% limit of jitter. This same fraction + // is applied to the RPCHoldTimeout jitterFraction = 16 // Warn if the Raft command is larger than this. @@ -189,6 +191,8 @@ func (s *Server) handleConsulConn(conn net.Conn) { // forward is used to forward to a remote DC or to forward to the local leader // Returns a bool of if forwarding was performed, as well as any error func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { + var firstCheck time.Time + // Handle DC forwarding dc := info.RequestDatacenter() if dc != s.config.Datacenter { @@ -201,20 +205,51 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, return false, nil } - // Handle leader forwarding - if !s.IsLeader() { - err := s.forwardLeader(method, args, reply) +CHECK_LEADER: + // Find the leader + isLeader, remoteServer := s.getLeader() + + // Handle the case we are the leader + if isLeader { + return false, nil + } + + // Handle the case of a known leader + if remoteServer != nil { + err := s.forwardLeader(remoteServer, method, args, reply) return true, err } - return false, nil + + // Gate the request until there is a leader + if firstCheck.IsZero() { + firstCheck = time.Now() + } + if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout { + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + select { + case <-time.After(jitter): + goto CHECK_LEADER + case <-s.shutdownCh: + } + } + + // No leader found and hold time exceeded + return true, structs.ErrNoLeader } -// forwardLeader is used to forward an RPC call to the leader, or fail if no leader -func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error { +// getLeader returns if the current node is the leader, and if not +// then it returns the leader which is potentially nil if the cluster +// has not yet elected a leader. +func (s *Server) getLeader() (bool, *agent.Server) { + // Check if we are the leader + if s.IsLeader() { + return true, nil + } + // Get the leader leader := s.raft.Leader() if leader == "" { - return structs.ErrNoLeader + return false, nil } // Lookup the server @@ -222,6 +257,12 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{ server := s.localConsuls[leader] s.localLock.RUnlock() + // Server could be nil + return false, server +} + +// forwardLeader is used to forward an RPC call to the leader, or fail if no leader +func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error { // Handle a missing server if server == nil { return structs.ErrNoLeader