mirror of https://github.com/status-im/consul.git
consul: Refactor forward to hold RPC when no leader is known
This commit is contained in:
parent
26ca78936c
commit
be7893082c
|
@ -10,6 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
"github.com/hashicorp/consul/consul/state"
|
"github.com/hashicorp/consul/consul/state"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
|
@ -39,7 +40,8 @@ const (
|
||||||
|
|
||||||
// jitterFraction is a the limit to the amount of jitter we apply
|
// jitterFraction is a the limit to the amount of jitter we apply
|
||||||
// to a user specified MaxQueryTime. We divide the specified time by
|
// to a user specified MaxQueryTime. We divide the specified time by
|
||||||
// the fraction. So 16 == 6.25% limit of jitter
|
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
|
||||||
|
// is applied to the RPCHoldTimeout
|
||||||
jitterFraction = 16
|
jitterFraction = 16
|
||||||
|
|
||||||
// Warn if the Raft command is larger than this.
|
// Warn if the Raft command is larger than this.
|
||||||
|
@ -189,6 +191,8 @@ func (s *Server) handleConsulConn(conn net.Conn) {
|
||||||
// forward is used to forward to a remote DC or to forward to the local leader
|
// forward is used to forward to a remote DC or to forward to the local leader
|
||||||
// Returns a bool of if forwarding was performed, as well as any error
|
// Returns a bool of if forwarding was performed, as well as any error
|
||||||
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
|
||||||
|
var firstCheck time.Time
|
||||||
|
|
||||||
// Handle DC forwarding
|
// Handle DC forwarding
|
||||||
dc := info.RequestDatacenter()
|
dc := info.RequestDatacenter()
|
||||||
if dc != s.config.Datacenter {
|
if dc != s.config.Datacenter {
|
||||||
|
@ -201,20 +205,51 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle leader forwarding
|
CHECK_LEADER:
|
||||||
if !s.IsLeader() {
|
// Find the leader
|
||||||
err := s.forwardLeader(method, args, reply)
|
isLeader, remoteServer := s.getLeader()
|
||||||
return true, err
|
|
||||||
}
|
// Handle the case we are the leader
|
||||||
|
if isLeader {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
// Handle the case of a known leader
|
||||||
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
|
if remoteServer != nil {
|
||||||
|
err := s.forwardLeader(remoteServer, method, args, reply)
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gate the request until there is a leader
|
||||||
|
if firstCheck.IsZero() {
|
||||||
|
firstCheck = time.Now()
|
||||||
|
}
|
||||||
|
if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout {
|
||||||
|
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
|
||||||
|
select {
|
||||||
|
case <-time.After(jitter):
|
||||||
|
goto CHECK_LEADER
|
||||||
|
case <-s.shutdownCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No leader found and hold time exceeded
|
||||||
|
return true, structs.ErrNoLeader
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLeader returns if the current node is the leader, and if not
|
||||||
|
// then it returns the leader which is potentially nil if the cluster
|
||||||
|
// has not yet elected a leader.
|
||||||
|
func (s *Server) getLeader() (bool, *agent.Server) {
|
||||||
|
// Check if we are the leader
|
||||||
|
if s.IsLeader() {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get the leader
|
// Get the leader
|
||||||
leader := s.raft.Leader()
|
leader := s.raft.Leader()
|
||||||
if leader == "" {
|
if leader == "" {
|
||||||
return structs.ErrNoLeader
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lookup the server
|
// Lookup the server
|
||||||
|
@ -222,6 +257,12 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
|
||||||
server := s.localConsuls[leader]
|
server := s.localConsuls[leader]
|
||||||
s.localLock.RUnlock()
|
s.localLock.RUnlock()
|
||||||
|
|
||||||
|
// Server could be nil
|
||||||
|
return false, server
|
||||||
|
}
|
||||||
|
|
||||||
|
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
|
||||||
|
func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error {
|
||||||
// Handle a missing server
|
// Handle a missing server
|
||||||
if server == nil {
|
if server == nil {
|
||||||
return structs.ErrNoLeader
|
return structs.ErrNoLeader
|
||||||
|
|
Loading…
Reference in New Issue