Update comments to reflect reality

This commit is contained in:
Sean Chittenden 2016-02-24 19:51:56 -08:00
parent d2d55f4bb0
commit bc62de541c
1 changed files with 45 additions and 33 deletions

View File

@ -59,23 +59,23 @@ type ConsulClusterInfo interface {
// serverCfg is the thread-safe configuration structure that is used to // serverCfg is the thread-safe configuration structure that is used to
// maintain the list of consul servers in Client. // maintain the list of consul servers in Client.
// //
// NOTE(sean@): We are explicitly relying on the fact that this is copied. // NOTE(sean@): We are explicitly relying on the fact that serverConfig will
// Please keep this structure light. // be copied onto the stack. Please keep this structure light.
type serverConfig struct { type serverConfig struct {
// servers tracks the locally known servers // servers tracks the locally known servers. List membership is
// maintained by Serf.
servers []*server_details.ServerDetails servers []*server_details.ServerDetails
} }
type ServerManager struct { type ServerManager struct {
// serverConfig provides the necessary load/store semantics to // serverConfig provides the necessary load/store semantics for the
// serverConfig // server list.
serverConfigValue atomic.Value serverConfigValue atomic.Value
serverConfigLock sync.Mutex serverConfigLock sync.Mutex
// shutdownCh is a copy of the channel in consul.Client // shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{} shutdownCh chan struct{}
// logger uses the provided LogOutput
logger *log.Logger logger *log.Logger
// serf is used to estimate the approximate number of nodes in a // serf is used to estimate the approximate number of nodes in a
@ -89,8 +89,10 @@ type ServerManager struct {
} }
// AddServer takes out an internal write lock and adds a new server. If the // AddServer takes out an internal write lock and adds a new server. If the
// server is not known, it adds the new server and schedules a rebalance. If // server is not known, appends the server to the list. The new server will
// it is known, we merge the new server details. // begin seeing use after the rebalance timer fires or enough servers fail
// organically. If the server is already known, merge the new server
// details.
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -130,8 +132,7 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
numServers := len(sc.servers) numServers := len(sc.servers)
if numServers < 2 { if numServers < 2 {
// No action required return servers // No action required
return servers
} }
newServers := make([]*server_details.ServerDetails, 0, numServers) newServers := make([]*server_details.ServerDetails, 0, numServers)
@ -141,7 +142,11 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
} }
// FindHealthyServer takes out an internal "read lock" and searches through // FindHealthyServer takes out an internal "read lock" and searches through
// the list of servers to find a healthy server. // the list of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil.
func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails { func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numServers := len(serverCfg.servers) numServers := len(serverCfg.servers)
@ -149,7 +154,10 @@ func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails {
sm.logger.Printf("[ERR] consul: No servers found in the server config") sm.logger.Printf("[ERR] consul: No servers found in the server config")
return nil return nil
} else { } else {
// Return whatever is at the front of the list // Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return serverCfg.servers[0] return serverCfg.servers[0]
} }
} }
@ -170,28 +178,29 @@ func (sm *ServerManager) getServerConfig() serverConfig {
// NewServerManager is the only way to safely create a new ServerManager // NewServerManager is the only way to safely create a new ServerManager
// struct. // struct.
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, cci ConsulClusterInfo) (sm *ServerManager) { func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle // NOTE(sean@): Can't pass *consul.Client due to an import cycle
sm = new(ServerManager) sm = new(ServerManager)
sm.logger = logger sm.logger = logger
sm.clusterInfo = cci sm.clusterInfo = clusterInfo
sm.shutdownCh = shutdownCh sm.shutdownCh = shutdownCh
sc := serverConfig{} sc := serverConfig{}
sc.servers = make([]*server_details.ServerDetails, 0) sc.servers = make([]*server_details.ServerDetails, 0)
sm.serverConfigValue.Store(sc) sm.saveServerConfig(sc)
return sm return sm
} }
// NotifyFailedServer is an exported convenience function that allows callers // NotifyFailedServer marks the passed in server as "failed" by rotating it
// to pass in a server that has failed an RPC request and mark it as failed. // to the end of the server list.
// If the server being failed is not the first server on the list, this is a
// noop. If, however, the server is failed and first on the list, acquire
// the lock, retest, and take the penalty of moving the server to the end of
// the list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, acquire the lock, retest, and take the penalty of moving
// the server to the end of the list.
// Use atomic.CAS to emulate a TryLock(). // Use atomic.CAS to emulate a TryLock().
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server && if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server &&
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
@ -212,10 +221,12 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// RebalanceServers takes out an internal write lock and shuffles the list of // RebalanceServers takes out an internal write lock and shuffles the list of
// servers on this agent. This allows for a redistribution of work across // servers on this agent. This allows for a redistribution of work across
// consul servers and provides a guarantee that the order list of // consul servers and provides a guarantee that the order of the server list
// ServerDetails isn't actually ordered, therefore we can sequentially walk // isn't related to the age at which the node was added to the cluster.
// the array to pick a server without all agents in the cluster dog piling on // Elsewhere we rely on the position in the server list as a hint regarding
// a single node. // the stability of a server relative to its position in the server list.
// Servers at or near the front of the list are more stable than servers near
// the end of the list.
func (sm *ServerManager) RebalanceServers() { func (sm *ServerManager) RebalanceServers() {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -237,11 +248,7 @@ func (sm *ServerManager) RebalanceServers() {
} }
// RemoveServer takes out an internal write lock and removes a server from // RemoveServer takes out an internal write lock and removes a server from
// the server list. No rebalancing happens as a result of the removed server // the server list.
// because we do not want a network partition which separated a server from
// this agent to cause an increase in work. Instead we rely on the internal
// already existing semantics to handle failure detection after a server has
// been removed.
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -264,7 +271,9 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
} }
} }
// refreshServerRebalanceTimer is called // refreshServerRebalanceTimer is only called once the rebalanceTimer
// expires. Historically this was an expensive routine and is intended to be
// run in isolation in a dedicated, non-concurrent task.
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers) numConsulServers := len(serverCfg.servers)
@ -289,8 +298,11 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
} }
// Start is used to start and manage the task of automatically shuffling and // Start is used to start and manage the task of automatically shuffling and
// rebalance the list of consul servers. This maintenance happens either // rebalancing the list of consul servers. This maintenance only happens
// when a new server is added or when a duration has been exceed. // periodically based on the expiration of the timer. Failed servers are
// automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available consul servers.
func (sm *ServerManager) Start() { func (sm *ServerManager) Start() {
var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour)) var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
var rebalanceTaskDispatched int32 var rebalanceTaskDispatched int32