mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 19:50:36 +00:00
Merge pull request #621 from hashicorp/f-leave
Changing interaction between Leave and RemovePeers
This commit is contained in:
commit
149f25aaff
@ -469,9 +469,11 @@ func (s *Server) handleReapMember(member serf.Member) error {
|
||||
// handleDeregisterMember is used to deregister a member of a given reason
|
||||
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
|
||||
state := s.fsm.State()
|
||||
// Check if the node does not exists
|
||||
_, found, _ := state.GetNode(member.Name)
|
||||
if !found {
|
||||
// Do not deregister ourself. This can only happen if the current leader
|
||||
// is leaving. Instead, we should allow a follower to take-over and
|
||||
// deregister us later.
|
||||
if member.Name == s.config.NodeName {
|
||||
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -483,11 +485,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
|
||||
}
|
||||
}
|
||||
|
||||
// Do not deregister ourself. This can only happen if the current leader
|
||||
// is leaving. Instead, we should allow a follower to take-over and
|
||||
// deregister us later.
|
||||
if member.Name == s.config.NodeName {
|
||||
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
|
||||
// Check if the node does not exists
|
||||
_, found, _ := state.GetNode(member.Name)
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -53,6 +53,10 @@ const (
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
// This is used to reduce disk I/O for the recently commited entries.
|
||||
raftLogCacheSize = 512
|
||||
|
||||
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
|
||||
// to replicate to gracefully leave the cluster.
|
||||
raftRemoveGracePeriod = 5 * time.Second
|
||||
)
|
||||
|
||||
// Server is Consul server which manages the service discovery,
|
||||
@ -521,6 +525,25 @@ func (s *Server) Leave() error {
|
||||
s.logger.Printf("[INFO] consul: server starting leave")
|
||||
s.left = true
|
||||
|
||||
// Check the number of known peers
|
||||
numPeers, err := s.numOtherPeers()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// If we are the current leader, and we have any other peers (cluster has multiple
|
||||
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
|
||||
// not the leader, then we should issue our leave intention and wait to be removed
|
||||
// for some sane period of time.
|
||||
isLeader := s.IsLeader()
|
||||
if isLeader && numPeers > 0 {
|
||||
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
|
||||
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
|
||||
s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Leave the WAN pool
|
||||
if s.serfWAN != nil {
|
||||
if err := s.serfWAN.Leave(); err != nil {
|
||||
@ -534,9 +557,47 @@ func (s *Server) Leave() error {
|
||||
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If we were not leader, wait to be safely removed from the cluster.
|
||||
// We must wait to allow the raft replication to take place, otherwise
|
||||
// an immediate shutdown could cause a loss of quorum.
|
||||
if !isLeader {
|
||||
limit := time.Now().Add(raftRemoveGracePeriod)
|
||||
for numPeers > 0 && time.Now().Before(limit) {
|
||||
// Update the number of peers
|
||||
numPeers, err = s.numOtherPeers()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
// Avoid the sleep if we are done
|
||||
if numPeers == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Sleep a while and check again
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
if numPeers != 0 {
|
||||
s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// numOtherPeers is used to check on the number of known peers
|
||||
// excluding the local ndoe
|
||||
func (s *Server) numOtherPeers() (int, error) {
|
||||
peers, err := s.raftPeers.Peers()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
|
||||
return len(otherPeers), nil
|
||||
}
|
||||
|
||||
// JoinLAN is used to have Consul join the inner-DC pool
|
||||
// The target address should be another node inside the DC
|
||||
// listening on the Serf LAN address
|
||||
|
@ -216,6 +216,61 @@ func TestServer_JoinWAN(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestServer_LeaveLeader(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Second server not in bootstrap mode
|
||||
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var p1 []net.Addr
|
||||
var p2 []net.Addr
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
p1, _ = s1.raftPeers.Peers()
|
||||
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
p2, _ = s2.raftPeers.Peers()
|
||||
return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1))
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
|
||||
// Issue a leave to the leader
|
||||
for _, s := range []*Server{s1, s2} {
|
||||
if !s.IsLeader() {
|
||||
continue
|
||||
}
|
||||
if err := s.Leave(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Should lose a peer
|
||||
for _, s := range []*Server{s1, s2} {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
p1, _ = s.raftPeers.Peers()
|
||||
return len(p1) == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 1 peer: %v", p1)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Leave(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
@ -250,18 +305,25 @@ func TestServer_Leave(t *testing.T) {
|
||||
t.Fatalf("should have 2 peers: %v", err)
|
||||
})
|
||||
|
||||
// Issue a leave
|
||||
if err := s2.Leave(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
// Issue a leave to the non-leader
|
||||
for _, s := range []*Server{s1, s2} {
|
||||
if s.IsLeader() {
|
||||
continue
|
||||
}
|
||||
if err := s.Leave(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Should lose a peer
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
p1, _ = s1.raftPeers.Peers()
|
||||
return len(p1) == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 1 peer: %v", p1)
|
||||
})
|
||||
for _, s := range []*Server{s1, s2} {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
p1, _ = s.raftPeers.Peers()
|
||||
return len(p1) == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 1 peer: %v", p1)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_RPC(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user