Rename serverCfg to sc for consistency

This commit is contained in:
Sean Chittenden 2016-03-28 11:43:17 -07:00
parent 664ab238fd
commit 8ac35d84f0
1 changed files with 44 additions and 44 deletions

View File

@ -59,7 +59,7 @@ type ConnPoolPinger interface {
PingConsulServer(server *server_details.ServerDetails) bool PingConsulServer(server *server_details.ServerDetails) bool
} }
// serverCfg is the thread-safe configuration struct used to maintain the // serverConfig is the thread-safe configuration struct used to maintain the
// list of Consul servers in ServerManager. // list of Consul servers in ServerManager.
// //
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will // NOTE(sean@): We are explicitly relying on the fact that serverConfig will
@ -107,20 +107,20 @@ type ServerManager struct {
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
// Check if this server is known // Check if this server is known
found := false found := false
for idx, existing := range serverCfg.servers { for idx, existing := range sc.servers {
if existing.Name == server.Name { if existing.Name == server.Name {
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) newServers := make([]*server_details.ServerDetails, len(sc.servers))
copy(newServers, serverCfg.servers) copy(newServers, sc.servers)
// Overwrite the existing server details in order to // Overwrite the existing server details in order to
// possibly update metadata (e.g. server version) // possibly update metadata (e.g. server version)
newServers[idx] = server newServers[idx] = server
serverCfg.servers = newServers sc.servers = newServers
found = true found = true
break break
} }
@ -128,13 +128,13 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
// Add to the list if not known // Add to the list if not known
if !found { if !found {
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1) newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1)
copy(newServers, serverCfg.servers) copy(newServers, sc.servers)
newServers = append(newServers, server) newServers = append(newServers, server)
serverCfg.servers = newServers sc.servers = newServers
} }
sm.saveServerConfig(serverCfg) sm.saveServerConfig(sc)
} }
// cycleServers returns a new list of servers that has dequeued the first // cycleServers returns a new list of servers that has dequeued the first
@ -193,8 +193,8 @@ func (sc *serverConfig) shuffleServers() {
// during an RPC call, it is rotated to the end of the list. If there are no // during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil. // servers available, return nil.
func (sm *ServerManager) FindServer() *server_details.ServerDetails { func (sm *ServerManager) FindServer() *server_details.ServerDetails {
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
numServers := len(serverCfg.servers) numServers := len(sc.servers)
if numServers == 0 { if numServers == 0 {
sm.logger.Printf("[WARN] server manager: No servers available") sm.logger.Printf("[WARN] server manager: No servers available")
return nil return nil
@ -203,7 +203,7 @@ func (sm *ServerManager) FindServer() *server_details.ServerDetails {
// assumed to be the oldest in the server list (unless - // assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a // hypothetically - the server list was rotated right after a
// server was added). // server was added).
return serverCfg.servers[0] return sc.servers[0]
} }
} }
@ -237,7 +237,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster
// NotifyFailedServer marks the passed in server as "failed" by rotating it // NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list. // to the end of the server list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
// If the server being failed is not the first server on the list, // If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on // this is a noop. If, however, the server is failed and first on
@ -245,7 +245,7 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// the server to the end of the list. // the server to the end of the list.
// Only rotate the server list when there is more than one server // Only rotate the server list when there is more than one server
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server && if len(sc.servers) > 1 && sc.servers[0] == server &&
// Use atomic.CAS to emulate a TryLock(). // Use atomic.CAS to emulate a TryLock().
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
@ -254,11 +254,11 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// server to the end. // server to the end.
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
serverCfg = sm.getServerConfig() sc = sm.getServerConfig()
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server { if len(sc.servers) > 1 && sc.servers[0] == server {
serverCfg.servers = serverCfg.cycleServer() sc.servers = sc.cycleServer()
sm.saveServerConfig(serverCfg) sm.saveServerConfig(sc)
} }
} }
} }
@ -266,8 +266,8 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// NumServers takes out an internal "read lock" and returns the number of // NumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers. // servers. numServers includes both healthy and unhealthy servers.
func (sm *ServerManager) NumServers() (numServers int) { func (sm *ServerManager) NumServers() (numServers int) {
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
numServers = len(serverCfg.servers) numServers = len(sc.servers)
return numServers return numServers
} }
@ -286,24 +286,24 @@ func (sm *ServerManager) NumServers() (numServers int) {
func (sm *ServerManager) RebalanceServers() { func (sm *ServerManager) RebalanceServers() {
FAILED_SERVER_DURING_REBALANCE: FAILED_SERVER_DURING_REBALANCE:
// Obtain a copy of the server config // Obtain a copy of the server config
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
// Early abort if there is no value to shuffling // Early abort if there is no value to shuffling
if len(serverCfg.servers) < 2 { if len(sc.servers) < 2 {
// sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(serverCfg.servers)) // sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(sc.servers))
return return
} }
serverCfg.shuffleServers() sc.shuffleServers()
// Iterate through the shuffled server list to find a healthy server. // Iterate through the shuffled server list to find a healthy server.
// Don't iterate on the list directly, this loop mutates server the // Don't iterate on the list directly, this loop mutates server the
// list. // list.
var foundHealthyServer bool var foundHealthyServer bool
for n := len(serverCfg.servers); n > 0; n-- { for n := len(sc.servers); n > 0; n-- {
// Always test the first server. Failed servers are cycled // Always test the first server. Failed servers are cycled
// while Serf detects the node has failed. // while Serf detects the node has failed.
selectedServer := serverCfg.servers[0] selectedServer := sc.servers[0]
// sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String()) // sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String())
ok := sm.connPoolPinger.PingConsulServer(selectedServer) ok := sm.connPoolPinger.PingConsulServer(selectedServer)
@ -311,7 +311,7 @@ FAILED_SERVER_DURING_REBALANCE:
foundHealthyServer = true foundHealthyServer = true
break break
} }
serverCfg.cycleServer() sc.cycleServer()
} }
// If no healthy servers were found, sleep and wait for Serf to make // If no healthy servers were found, sleep and wait for Serf to make
@ -341,7 +341,7 @@ FAILED_SERVER_DURING_REBALANCE:
state byte state byte
} }
mergedList := make(map[server_details.Key]*targetServer) mergedList := make(map[server_details.Key]*targetServer)
for _, s := range serverCfg.servers { for _, s := range sc.servers {
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
} }
for _, s := range tmpServerCfg.servers { for _, s := range tmpServerCfg.servers {
@ -355,7 +355,7 @@ FAILED_SERVER_DURING_REBALANCE:
} }
// Ensure the selected server has not been removed by Serf // Ensure the selected server has not been removed by Serf
selectedServerKey := serverCfg.servers[0].Key() selectedServerKey := sc.servers[0].Key()
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
return false return false
} }
@ -367,16 +367,16 @@ FAILED_SERVER_DURING_REBALANCE:
// Do nothing, server exists in both // Do nothing, server exists in both
case 'o': case 'o':
// Server has been removed // Server has been removed
serverCfg.removeServerByKey(&k) sc.removeServerByKey(&k)
case 'n': case 'n':
// Server added // Server added
serverCfg.servers = append(serverCfg.servers, v.server) sc.servers = append(sc.servers, v.server)
default: default:
panic("not implemented") panic("not implemented")
} }
} }
sm.saveServerConfig(serverCfg) sm.saveServerConfig(sc)
return true return true
} }
@ -384,7 +384,7 @@ FAILED_SERVER_DURING_REBALANCE:
goto FAILED_SERVER_DURING_REBALANCE goto FAILED_SERVER_DURING_REBALANCE
} }
sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(serverCfg.servers), serverCfg.servers[0].String()) sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String())
return return
} }
@ -393,17 +393,17 @@ FAILED_SERVER_DURING_REBALANCE:
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
// Remove the server if known // Remove the server if known
for i, _ := range serverCfg.servers { for i, _ := range sc.servers {
if serverCfg.servers[i].Name == server.Name { if sc.servers[i].Name == server.Name {
newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1) newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1)
newServers = append(newServers, serverCfg.servers[:i]...) newServers = append(newServers, sc.servers[:i]...)
newServers = append(newServers, serverCfg.servers[i+1:]...) newServers = append(newServers, sc.servers[i+1:]...)
serverCfg.servers = newServers sc.servers = newServers
sm.saveServerConfig(serverCfg) sm.saveServerConfig(sc)
return return
} }
} }
@ -411,8 +411,8 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. // refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires.
func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration {
serverCfg := sm.getServerConfig() sc := sm.getServerConfig()
numConsulServers := len(serverCfg.servers) numConsulServers := len(sc.servers)
// Limit this connection's life based on the size (and health) of the // Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than // cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed // connReuseLowWatermarkDuration, and make sure we never exceed