diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index 14a99ec783..ee3b09b4f8 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -59,23 +59,23 @@ type ConsulClusterInfo interface { // serverCfg is the thread-safe configuration structure that is used to // maintain the list of consul servers in Client. // -// NOTE(sean@): We are explicitly relying on the fact that this is copied. -// Please keep this structure light. +// NOTE(sean@): We are explicitly relying on the fact that serverConfig will +// be copied onto the stack. Please keep this structure light. type serverConfig struct { - // servers tracks the locally known servers + // servers tracks the locally known servers. List membership is + // maintained by Serf. servers []*server_details.ServerDetails } type ServerManager struct { - // serverConfig provides the necessary load/store semantics to - // serverConfig + // serverConfig provides the necessary load/store semantics for the + // server list. serverConfigValue atomic.Value serverConfigLock sync.Mutex // shutdownCh is a copy of the channel in consul.Client shutdownCh chan struct{} - // logger uses the provided LogOutput logger *log.Logger // serf is used to estimate the approximate number of nodes in a @@ -89,8 +89,10 @@ type ServerManager struct { } // AddServer takes out an internal write lock and adds a new server. If the -// server is not known, it adds the new server and schedules a rebalance. If -// it is known, we merge the new server details. +// server is not known, appends the server to the list. The new server will +// begin seeing use after the rebalance timer fires or enough servers fail +// organically. If the server is already known, merge the new server +// details. func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() @@ -130,8 +132,7 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { numServers := len(sc.servers) if numServers < 2 { - // No action required - return servers + return servers // No action required } newServers := make([]*server_details.ServerDetails, 0, numServers) @@ -141,7 +142,11 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) } // FindHealthyServer takes out an internal "read lock" and searches through -// the list of servers to find a healthy server. +// the list of servers to find a "healthy" server. If the server is actually +// unhealthy, we rely on Serf to detect this and remove the node from the +// server list. If the server at the front of the list has failed or fails +// during an RPC call, it is rotated to the end of the list. If there are no +// servers available, return nil. func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails { serverCfg := sm.getServerConfig() numServers := len(serverCfg.servers) @@ -149,7 +154,10 @@ func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails { sm.logger.Printf("[ERR] consul: No servers found in the server config") return nil } else { - // Return whatever is at the front of the list + // Return whatever is at the front of the list because it is + // assumed to be the oldest in the server list (unless - + // hypothetically - the server list was rotated right after a + // server was added). return serverCfg.servers[0] } } @@ -170,28 +178,29 @@ func (sm *ServerManager) getServerConfig() serverConfig { // NewServerManager is the only way to safely create a new ServerManager // struct. -func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, cci ConsulClusterInfo) (sm *ServerManager) { +func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { // NOTE(sean@): Can't pass *consul.Client due to an import cycle sm = new(ServerManager) sm.logger = logger - sm.clusterInfo = cci + sm.clusterInfo = clusterInfo sm.shutdownCh = shutdownCh sc := serverConfig{} sc.servers = make([]*server_details.ServerDetails, 0) - sm.serverConfigValue.Store(sc) + sm.saveServerConfig(sc) return sm } -// NotifyFailedServer is an exported convenience function that allows callers -// to pass in a server that has failed an RPC request and mark it as failed. -// If the server being failed is not the first server on the list, this is a -// noop. If, however, the server is failed and first on the list, acquire -// the lock, retest, and take the penalty of moving the server to the end of -// the list. +// NotifyFailedServer marks the passed in server as "failed" by rotating it +// to the end of the server list. func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { serverCfg := sm.getServerConfig() + // If the server being failed is not the first server on the list, + // this is a noop. If, however, the server is failed and first on + // the list, acquire the lock, retest, and take the penalty of moving + // the server to the end of the list. + // Use atomic.CAS to emulate a TryLock(). if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server && atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { @@ -212,10 +221,12 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // RebalanceServers takes out an internal write lock and shuffles the list of // servers on this agent. This allows for a redistribution of work across -// consul servers and provides a guarantee that the order list of -// ServerDetails isn't actually ordered, therefore we can sequentially walk -// the array to pick a server without all agents in the cluster dog piling on -// a single node. +// consul servers and provides a guarantee that the order of the server list +// isn't related to the age at which the node was added to the cluster. +// Elsewhere we rely on the position in the server list as a hint regarding +// the stability of a server relative to its position in the server list. +// Servers at or near the front of the list are more stable than servers near +// the end of the list. func (sm *ServerManager) RebalanceServers() { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() @@ -237,11 +248,7 @@ func (sm *ServerManager) RebalanceServers() { } // RemoveServer takes out an internal write lock and removes a server from -// the server list. No rebalancing happens as a result of the removed server -// because we do not want a network partition which separated a server from -// this agent to cause an increase in work. Instead we rely on the internal -// already existing semantics to handle failure detection after a server has -// been removed. +// the server list. func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() @@ -264,7 +271,9 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { } } -// refreshServerRebalanceTimer is called +// refreshServerRebalanceTimer is only called once the rebalanceTimer +// expires. Historically this was an expensive routine and is intended to be +// run in isolation in a dedicated, non-concurrent task. func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { serverCfg := sm.getServerConfig() numConsulServers := len(serverCfg.servers) @@ -289,8 +298,11 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) { } // Start is used to start and manage the task of automatically shuffling and -// rebalance the list of consul servers. This maintenance happens either -// when a new server is added or when a duration has been exceed. +// rebalancing the list of consul servers. This maintenance only happens +// periodically based on the expiration of the timer. Failed servers are +// automatically cycled to the end of the list. New servers are appended to +// the list. The order of the server list must be shuffled periodically to +// distribute load across all known and available consul servers. func (sm *ServerManager) Start() { var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour)) var rebalanceTaskDispatched int32