Simplify Raft peer adds using only reconciliation

This commit is contained in:
Armon Dadgar 2014-01-10 12:55:55 -08:00
parent 28c1e556d6
commit cf29019545
5 changed files with 141 additions and 88 deletions

View File

@ -2,7 +2,9 @@ package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"net"
"time"
)
@ -38,7 +40,15 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf.Member
RECONCILE:
// Setup a reconciliation timer
reconcileCh = nil
interval := time.After(s.config.ReconcileInterval)
// Apply a raft barrier to ensure our FSM is caught up
barrier := s.raft.Barrier(0)
if err := barrier.Error(); err != nil {
@ -52,15 +62,24 @@ RECONCILE:
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s.reconcileCh
WAIT:
// Periodically reconcile as long as we are the leader
select {
case <-time.After(s.config.ReconcileInterval):
goto RECONCILE
case <-stopCh:
return
case <-s.shutdownCh:
return
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <-stopCh:
return
case <-s.shutdownCh:
return
case <-interval:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
}
}
}
@ -127,6 +146,11 @@ func (s *Server) handleAliveMember(member serf.Member) error {
Service: "consul",
Port: port,
}
// Attempt to join the consul server
if err := s.joinConsulServer(member, port); err != nil {
return err
}
}
// Check if the node exists
@ -220,6 +244,17 @@ func (s *Server) handleLeftMember(member serf.Member) error {
}
s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name)
// Remove from Raft peers if this was a server
if valid, _, port := isConsulServer(member); valid {
peer := &net.TCPAddr{IP: member.Addr, Port: port}
future := s.raft.RemovePeer(peer)
if err := future.Error(); err != nil && err != raft.UnknownPeer {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
peer, err)
return err
}
}
// Deregister the node
req := structs.DeregisterRequest{
Datacenter: s.config.Datacenter,
@ -228,3 +263,20 @@ func (s *Server) handleLeftMember(member serf.Member) error {
var out struct{}
return s.endpoints.Catalog.Deregister(&req, &out)
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, port int) error {
// Do not join ourself
if m.Name == s.config.NodeName {
return nil
}
// Attempt to add as a peer
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
future := s.raft.AddPeer(addr)
if err := future.Error(); err != nil && err != raft.KnownPeer {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
return nil
}

View File

@ -188,3 +188,72 @@ func TestLeader_Reconcile(t *testing.T) {
t.Fatalf("client not registered")
}
}
func TestLeader_LeftServer(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
// Wait until we have 3 peers
start := time.Now()
CHECK1:
for _, s := range servers {
peers, _ := s.raftPeers.Peers()
if len(peers) != 3 {
if time.Now().Sub(start) >= 2*time.Second {
t.Fatalf("should have 3 peers")
} else {
time.Sleep(100 * time.Millisecond)
goto CHECK1
}
}
}
// Kill any server
servers[0].Shutdown()
// Wait for failure detection
time.Sleep(500 * time.Millisecond)
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for intent propagation
time.Sleep(500 * time.Millisecond)
// Wait until we have 2 peers
start = time.Now()
CHECK2:
for _, s := range servers[1:] {
peers, _ := s.raftPeers.Peers()
if len(peers) != 2 {
if time.Now().Sub(start) >= 2*time.Second {
t.Fatalf("should have 2 peers")
} else {
time.Sleep(100 * time.Millisecond)
goto CHECK2
}
}
}
}

View File

@ -1,10 +1,8 @@
package consul
import (
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"net"
"time"
)
// lanEventHandler is used to handle events from the lan Serf cluster
@ -14,7 +12,6 @@ func (s *Server) lanEventHandler() {
case e := <-s.eventChLAN:
switch e.EventType() {
case serf.EventMemberJoin:
s.localJoin(e.(serf.MemberEvent))
fallthrough
case serf.EventMemberLeave:
fallthrough
@ -62,26 +59,12 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) {
return
}
// Dispatch an async handler for each member
// Queue the members for reconciliation
for _, m := range me.Members {
go s.reconcileMember(m)
}
}
// localJoin is used to handle join events on the lan serf cluster
func (s *Server) localJoin(me serf.MemberEvent) {
// Check for consul members
for _, m := range me.Members {
ok, dc, port := isConsulServer(m)
if !ok {
continue
select {
case s.reconcileCh <- m:
default:
}
if dc != s.config.Datacenter {
s.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, dc)
continue
}
go s.joinConsulServer(m, port)
}
}
@ -147,60 +130,3 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
s.remoteLock.Unlock()
}
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, port int) {
if m.Name == s.config.NodeName {
return
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
var future raft.Future
CHECK:
// Get the Raft peers
peers, err := s.raftPeers.Peers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to get raft peers: %v", err)
goto WAIT
}
// Bail if this node is already a peer
for _, p := range peers {
if p.String() == addr.String() {
return
}
}
// Bail if the node is not alive
if memberStatus(s.serfLAN.Members(), m.Name) != serf.StatusAlive {
return
}
// Attempt to add as a peer
future = s.raft.AddPeer(addr)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
} else {
return
}
WAIT:
time.Sleep(500 * time.Millisecond)
select {
case <-s.shutdownCh:
return
default:
goto CHECK
}
}
// memberStatus scans a list of members for a matching one,
// returning the status or StatusNone
func memberStatus(members []serf.Member, name string) serf.MemberStatus {
for _, m := range members {
if m.Name == name {
return m.Status
}
}
return serf.StatusNone
}

View File

@ -53,6 +53,11 @@ type Server struct {
raftStore *raft.MDBStore
raftTransport *raft.NetworkTransport
// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
reconcileCh chan serf.Member
// remoteConsuls is used to track the known consuls in
// remote data centers. Used to do DC forwarding.
remoteConsuls map[string][]net.Addr
@ -110,6 +115,7 @@ func NewServer(config *Config) (*Server, error) {
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]net.Addr),
rpcClients: make(map[net.Conn]struct{}),
rpcServer: rpc.NewServer(),
@ -202,7 +208,7 @@ func (s *Server) setupRaft() error {
}
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
trans := raft.NewNetworkTransport(s.raftLayer, 3, 500*time.Millisecond, s.config.LogOutput)
s.raftTransport = trans
// Setup the peer store

View File

@ -64,7 +64,7 @@ func testServerDCBootstrap(t *testing.T, dc string, bootstrap bool) (string, *Se
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
config.ReconcileInterval = 50 * time.Millisecond
config.ReconcileInterval = 100 * time.Millisecond
server, err := NewServer(config)
if err != nil {