mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 19:50:36 +00:00
Adds offline detection.
This commit is contained in:
parent
7c27ca1f77
commit
7525836b28
@ -99,6 +99,10 @@ type Manager struct {
|
||||
// notifyFailedBarrier is acts as a barrier to prevent queuing behind
|
||||
// serverListLog and acts as a TryLock().
|
||||
notifyFailedBarrier int32
|
||||
|
||||
// offline is used to indicate that there are no servers, or that all
|
||||
// known servers have failed the ping test.
|
||||
offline int32
|
||||
}
|
||||
|
||||
// AddServer takes out an internal write lock and adds a new server. If the
|
||||
@ -136,6 +140,10 @@ func (m *Manager) AddServer(s *agent.Server) {
|
||||
l.servers = newServers
|
||||
}
|
||||
|
||||
// Assume we are no longer offline since we've just seen a new server.
|
||||
atomic.StoreInt32(&m.offline, 0)
|
||||
|
||||
// Start using this list of servers.
|
||||
m.saveServerList(l)
|
||||
}
|
||||
|
||||
@ -180,6 +188,13 @@ func (l *serverList) shuffleServers() {
|
||||
}
|
||||
}
|
||||
|
||||
// IsOffline checks to see if all the known servers have failed their ping
|
||||
// test during the last rebalance.
|
||||
func (m *Manager) IsOffline() bool {
|
||||
offline := atomic.LoadInt32(&m.offline)
|
||||
return offline == 1
|
||||
}
|
||||
|
||||
// FindServer takes out an internal "read lock" and searches through the list
|
||||
// of servers to find a "healthy" server. If the server is actually
|
||||
// unhealthy, we rely on Serf to detect this and remove the node from the
|
||||
@ -221,6 +236,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCl
|
||||
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
||||
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
|
||||
m.shutdownCh = shutdownCh
|
||||
m.offline = 1
|
||||
|
||||
l := serverList{}
|
||||
l.servers = make([]*agent.Server, 0)
|
||||
@ -280,11 +296,7 @@ func (m *Manager) RebalanceServers() {
|
||||
// Obtain a copy of the current serverList
|
||||
l := m.getServerList()
|
||||
|
||||
// Early abort if there is nothing to shuffle
|
||||
if len(l.servers) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
// Shuffle servers so we have a chance of picking a new one.
|
||||
l.shuffleServers()
|
||||
|
||||
// Iterate through the shuffled server list to find an assumed
|
||||
@ -307,8 +319,11 @@ func (m *Manager) RebalanceServers() {
|
||||
}
|
||||
|
||||
// If no healthy servers were found, sleep and wait for Serf to make
|
||||
// the world a happy place again.
|
||||
if !foundHealthyServer {
|
||||
// the world a happy place again. Update the offline status.
|
||||
if foundHealthyServer {
|
||||
atomic.StoreInt32(&m.offline, 0)
|
||||
} else {
|
||||
atomic.StoreInt32(&m.offline, 1)
|
||||
m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting")
|
||||
return
|
||||
}
|
||||
|
@ -77,6 +77,45 @@ func TestServers_AddServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (m *Manager) IsOffline() bool {
|
||||
func TestServers_IsOffline(t *testing.T) {
|
||||
m := testManager()
|
||||
if !m.IsOffline() {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
s1 := &agent.Server{Name: "s1"}
|
||||
m.AddServer(s1)
|
||||
if m.IsOffline() {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
m.RebalanceServers()
|
||||
if m.IsOffline() {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
m.RemoveServer(s1)
|
||||
m.RebalanceServers()
|
||||
if !m.IsOffline() {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
const failPct = 0.5
|
||||
m = testManagerFailProb(failPct)
|
||||
m.AddServer(s1)
|
||||
var on, off int
|
||||
for i := 0; i < 100; i++ {
|
||||
m.RebalanceServers()
|
||||
if m.IsOffline() {
|
||||
off++
|
||||
} else {
|
||||
on++
|
||||
}
|
||||
}
|
||||
if on == 0 || off == 0 {
|
||||
t.Fatalf("bad: %d %d", on, off)
|
||||
}
|
||||
}
|
||||
|
||||
// func (m *Manager) FindServer() (server *agent.Server) {
|
||||
func TestServers_FindServer(t *testing.T) {
|
||||
m := testManager()
|
||||
|
@ -189,6 +189,7 @@ func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
|
||||
|
||||
managers := r.managers[s.Datacenter]
|
||||
r.managers[s.Datacenter] = append(managers, manager)
|
||||
go manager.Start()
|
||||
}
|
||||
|
||||
info.manager.AddServer(s)
|
||||
@ -283,6 +284,10 @@ func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
||||
|
||||
// Try each manager until we get a server.
|
||||
for _, manager := range managers {
|
||||
if manager.IsOffline() {
|
||||
continue
|
||||
}
|
||||
|
||||
if s := manager.FindServer(); s != nil {
|
||||
return manager, s, true
|
||||
}
|
||||
|
@ -232,6 +232,86 @@ func TestRouter_Routing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_Routing_Offline(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
// Create a WAN-looking area.
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{1.0}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Adding the area should enable all the routes right away.
|
||||
if _, _, ok := r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc2"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcX"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Do a rebalance for dc1, which should knock it offline.
|
||||
func() {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
area, ok := r.areas[types.AreaWAN]
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
info, ok := area.managers["dc1"]
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
info.manager.RebalanceServers()
|
||||
}()
|
||||
|
||||
// Recheck all the routes.
|
||||
if _, _, ok := r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc1"); ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc2"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcX"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Add another area with a route to dc1.
|
||||
otherID := types.AreaID("other")
|
||||
other := newMockCluster(self)
|
||||
other.AddMember("dc0", "node0", nil)
|
||||
other.AddMember("dc1", "node1", nil)
|
||||
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Recheck all the routes and make sure it finds the one that's
|
||||
// online.
|
||||
if _, _, ok := r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc2"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcX"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_GetDatacenters(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user