mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
Merge pull request #2131 from hashicorp/b-misc-microoptimizations
Misc micro optimizations
This commit is contained in:
commit
aa1bb5a012
@ -1048,7 +1048,7 @@ func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error
|
|||||||
|
|
||||||
check, ok := a.checkTTLs[checkID]
|
check, ok := a.checkTTLs[checkID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("CheckID does not have associated TTL")
|
return fmt.Errorf("CheckID %q does not have associated TTL", checkID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the status through CheckTTL to reset the TTL
|
// Set the status through CheckTTL to reset the TTL
|
||||||
|
@ -154,9 +154,9 @@ type logRecord struct {
|
|||||||
type Member struct {
|
type Member struct {
|
||||||
Name string
|
Name string
|
||||||
Addr net.IP
|
Addr net.IP
|
||||||
Port uint16
|
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
Status string
|
Status string
|
||||||
|
Port uint16
|
||||||
ProtocolMin uint8
|
ProtocolMin uint8
|
||||||
ProtocolMax uint8
|
ProtocolMax uint8
|
||||||
ProtocolCur uint8
|
ProtocolCur uint8
|
||||||
|
@ -241,7 +241,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Select a random addr
|
// Select a random addr
|
||||||
offset := rand.Int31() % int32(len(servers))
|
offset := rand.Int31n(int32(len(servers)))
|
||||||
server := servers[offset]
|
server := servers[offset]
|
||||||
s.remoteLock.RUnlock()
|
s.remoteLock.RUnlock()
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||||||
localConsuls: make(map[string]*agent.Server),
|
localConsuls: make(map[string]*agent.Server),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
reconcileCh: make(chan serf.Member, 32),
|
reconcileCh: make(chan serf.Member, 32),
|
||||||
remoteConsuls: make(map[string][]*agent.Server),
|
remoteConsuls: make(map[string][]*agent.Server, 4),
|
||||||
rpcServer: rpc.NewServer(),
|
rpcServer: rpc.NewServer(),
|
||||||
rpcTLS: incomingTLS,
|
rpcTLS: incomingTLS,
|
||||||
tombstoneGC: gc,
|
tombstoneGC: gc,
|
||||||
|
@ -50,14 +50,14 @@ const (
|
|||||||
newRebalanceConnsPerSecPerServer = 64
|
newRebalanceConnsPerSecPerServer = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConsulClusterInfo is an interface wrapper around serf and prevents a
|
// ConsulClusterInfo is an interface wrapper around serf in order to prevent
|
||||||
// cyclic import dependency
|
// a cyclic import dependency.
|
||||||
type ConsulClusterInfo interface {
|
type ConsulClusterInfo interface {
|
||||||
NumNodes() int
|
NumNodes() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pinger is an interface wrapping client.ConnPool to prevent a
|
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
|
||||||
// cyclic import dependency
|
// dependency.
|
||||||
type Pinger interface {
|
type Pinger interface {
|
||||||
PingConsulServer(s *agent.Server) (bool, error)
|
PingConsulServer(s *agent.Server) (bool, error)
|
||||||
}
|
}
|
||||||
@ -269,8 +269,8 @@ func (m *Manager) NumServers() int {
|
|||||||
// fail for a particular server are rotated to the end of the list. This
|
// fail for a particular server are rotated to the end of the list. This
|
||||||
// method reshuffles the list periodically in order to redistribute work
|
// method reshuffles the list periodically in order to redistribute work
|
||||||
// across all known consul servers (i.e. guarantee that the order of servers
|
// across all known consul servers (i.e. guarantee that the order of servers
|
||||||
// in the server list isn't positively correlated with the age of a server in
|
// in the server list is not positively correlated with the age of a server
|
||||||
// the consul cluster). Periodically shuffling the server list prevents
|
// in the Consul cluster). Periodically shuffling the server list prevents
|
||||||
// long-lived clients from fixating on long-lived servers.
|
// long-lived clients from fixating on long-lived servers.
|
||||||
//
|
//
|
||||||
// Unhealthy servers are removed when serf notices the server has been
|
// Unhealthy servers are removed when serf notices the server has been
|
||||||
@ -280,16 +280,16 @@ func (m *Manager) RebalanceServers() {
|
|||||||
// Obtain a copy of the current serverList
|
// Obtain a copy of the current serverList
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
|
|
||||||
// Early abort if there is no value to shuffling
|
// Early abort if there is nothing to shuffle
|
||||||
if len(l.servers) < 2 {
|
if len(l.servers) < 2 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
l.shuffleServers()
|
l.shuffleServers()
|
||||||
|
|
||||||
// Iterate through the shuffled server list to find a healthy server.
|
// Iterate through the shuffled server list to find an assumed
|
||||||
// Don't iterate on the list directly, this loop mutates the server
|
// healthy server. NOTE: Do not iterate on the list directly because
|
||||||
// list.
|
// this loop mutates the server list in-place.
|
||||||
var foundHealthyServer bool
|
var foundHealthyServer bool
|
||||||
for i := 0; i < len(l.servers); i++ {
|
for i := 0; i < len(l.servers); i++ {
|
||||||
// Always test the first server. Failed servers are cycled
|
// Always test the first server. Failed servers are cycled
|
||||||
@ -320,8 +320,7 @@ func (m *Manager) RebalanceServers() {
|
|||||||
// reconcileServerList failed because Serf removed the server
|
// reconcileServerList failed because Serf removed the server
|
||||||
// that was at the front of the list that had successfully
|
// that was at the front of the list that had successfully
|
||||||
// been Ping'ed. Between the Ping and reconcile, a Serf
|
// been Ping'ed. Between the Ping and reconcile, a Serf
|
||||||
// event had shown up removing the node. Prevent an RPC
|
// event had shown up removing the node.
|
||||||
// timeout by retrying RebalanceServers().
|
|
||||||
//
|
//
|
||||||
// Instead of doing any heroics, "freeze in place" and
|
// Instead of doing any heroics, "freeze in place" and
|
||||||
// continue to use the existing connection until the next
|
// continue to use the existing connection until the next
|
||||||
@ -332,9 +331,9 @@ func (m *Manager) RebalanceServers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reconcileServerList returns true when the first server in serverList
|
// reconcileServerList returns true when the first server in serverList
|
||||||
// exists in the receiver's serverList. If true, the merged serverList
|
// exists in the receiver's serverList. If true, the merged serverList is
|
||||||
// is stored as the receiver's serverList. Returns false if the first
|
// stored as the receiver's serverList. Returns false if the first server
|
||||||
// server does not exist in the list (i.e. was removed by Serf during a
|
// does not exist in the list (i.e. was removed by Serf during a
|
||||||
// PingConsulServer() call. Newly added servers are appended to the list and
|
// PingConsulServer() call. Newly added servers are appended to the list and
|
||||||
// other missing servers are removed from the list.
|
// other missing servers are removed from the list.
|
||||||
func (m *Manager) reconcileServerList(l *serverList) bool {
|
func (m *Manager) reconcileServerList(l *serverList) bool {
|
||||||
@ -346,7 +345,7 @@ func (m *Manager) reconcileServerList(l *serverList) bool {
|
|||||||
newServerCfg := m.getServerList()
|
newServerCfg := m.getServerList()
|
||||||
|
|
||||||
// If Serf has removed all nodes, or there is no selected server
|
// If Serf has removed all nodes, or there is no selected server
|
||||||
// (zero nodes in l), abort early.
|
// (zero nodes in serverList), abort early.
|
||||||
if len(newServerCfg.servers) == 0 || len(l.servers) == 0 {
|
if len(newServerCfg.servers) == 0 || len(l.servers) == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -423,12 +422,12 @@ func (m *Manager) RemoveServer(s *agent.Server) {
|
|||||||
// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
|
// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
|
||||||
func (m *Manager) refreshServerRebalanceTimer() time.Duration {
|
func (m *Manager) refreshServerRebalanceTimer() time.Duration {
|
||||||
l := m.getServerList()
|
l := m.getServerList()
|
||||||
numConsulServers := len(l.servers)
|
numServers := len(l.servers)
|
||||||
// Limit this connection's life based on the size (and health) of the
|
// Limit this connection's life based on the size (and health) of the
|
||||||
// cluster. Never rebalance a connection more frequently than
|
// cluster. Never rebalance a connection more frequently than
|
||||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||||
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
|
||||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||||
numLANMembers := m.clusterInfo.NumNodes()
|
numLANMembers := m.clusterInfo.NumNodes()
|
||||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||||
@ -437,8 +436,8 @@ func (m *Manager) refreshServerRebalanceTimer() time.Duration {
|
|||||||
return connRebalanceTimeout
|
return connRebalanceTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetRebalanceTimer resets the rebalance timer. This method primarily
|
// ResetRebalanceTimer resets the rebalance timer. This method exists for
|
||||||
// exists for testing and should not be used directly.
|
// testing and should not be used directly.
|
||||||
func (m *Manager) ResetRebalanceTimer() {
|
func (m *Manager) ResetRebalanceTimer() {
|
||||||
m.listLock.Lock()
|
m.listLock.Lock()
|
||||||
defer m.listLock.Unlock()
|
defer m.listLock.Unlock()
|
||||||
@ -446,11 +445,11 @@ func (m *Manager) ResetRebalanceTimer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start is used to start and manage the task of automatically shuffling and
|
// Start is used to start and manage the task of automatically shuffling and
|
||||||
// rebalancing the list of consul servers. This maintenance only happens
|
// rebalancing the list of Consul servers. This maintenance only happens
|
||||||
// periodically based on the expiration of the timer. Failed servers are
|
// periodically based on the expiration of the timer. Failed servers are
|
||||||
// automatically cycled to the end of the list. New servers are appended to
|
// automatically cycled to the end of the list. New servers are appended to
|
||||||
// the list. The order of the server list must be shuffled periodically to
|
// the list. The order of the server list must be shuffled periodically to
|
||||||
// distribute load across all known and available consul servers.
|
// distribute load across all known and available Consul servers.
|
||||||
func (m *Manager) Start() {
|
func (m *Manager) Start() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(t *testing.T) {
|
func TestMain(t *testing.T) {
|
||||||
// Create a server
|
// Create a test Consul server
|
||||||
srv1 := testutil.NewTestServer(t)
|
srv1 := testutil.NewTestServer(t)
|
||||||
defer srv1.Stop()
|
defer srv1.Stop()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user