diff --git a/consul/leader.go b/consul/leader.go index e3debbff08..cf1e434fb5 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -2,7 +2,9 @@ package consul import ( "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "net" "time" ) @@ -38,7 +40,15 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintence activities func (s *Server) leaderLoop(stopCh chan struct{}) { + // Reconcile channel is only used once initial reconcile + // has succeeded + var reconcileCh chan serf.Member + RECONCILE: + // Setup a reconciliation timer + reconcileCh = nil + interval := time.After(s.config.ReconcileInterval) + // Apply a raft barrier to ensure our FSM is caught up barrier := s.raft.Barrier(0) if err := barrier.Error(); err != nil { @@ -52,15 +62,24 @@ RECONCILE: goto WAIT } + // Initial reconcile worked, now we can process the channel + // updates + reconcileCh = s.reconcileCh + WAIT: - // Periodically reconcile as long as we are the leader - select { - case <-time.After(s.config.ReconcileInterval): - goto RECONCILE - case <-stopCh: - return - case <-s.shutdownCh: - return + // Periodically reconcile as long as we are the leader, + // or when Serf events arrive + for { + select { + case <-stopCh: + return + case <-s.shutdownCh: + return + case <-interval: + goto RECONCILE + case member := <-reconcileCh: + s.reconcileMember(member) + } } } @@ -127,6 +146,11 @@ func (s *Server) handleAliveMember(member serf.Member) error { Service: "consul", Port: port, } + + // Attempt to join the consul server + if err := s.joinConsulServer(member, port); err != nil { + return err + } } // Check if the node exists @@ -220,6 +244,17 @@ func (s *Server) handleLeftMember(member serf.Member) error { } s.logger.Printf("[INFO] consul: member '%s' left, deregistering", member.Name) + // Remove from Raft peers if this was a server + if valid, _, port := isConsulServer(member); valid { + peer := &net.TCPAddr{IP: member.Addr, Port: port} + future := s.raft.RemovePeer(peer) + if err := future.Error(); err != nil && err != raft.UnknownPeer { + s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v", + peer, err) + return err + } + } + // Deregister the node req := structs.DeregisterRequest{ Datacenter: s.config.Datacenter, @@ -228,3 +263,20 @@ func (s *Server) handleLeftMember(member serf.Member) error { var out struct{} return s.endpoints.Catalog.Deregister(&req, &out) } + +// joinConsulServer is used to try to join another consul server +func (s *Server) joinConsulServer(m serf.Member, port int) error { + // Do not join ourself + if m.Name == s.config.NodeName { + return nil + } + + // Attempt to add as a peer + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + future := s.raft.AddPeer(addr) + if err := future.Error(); err != nil && err != raft.KnownPeer { + s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) + return err + } + return nil +} diff --git a/consul/leader_test.go b/consul/leader_test.go index f0acbd0d65..2202c77975 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -188,3 +188,72 @@ func TestLeader_Reconcile(t *testing.T) { t.Fatalf("client not registered") } } + +func TestLeader_LeftServer(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait until we have 3 peers + start := time.Now() +CHECK1: + for _, s := range servers { + peers, _ := s.raftPeers.Peers() + if len(peers) != 3 { + if time.Now().Sub(start) >= 2*time.Second { + t.Fatalf("should have 3 peers") + } else { + time.Sleep(100 * time.Millisecond) + goto CHECK1 + } + } + } + + // Kill any server + servers[0].Shutdown() + + // Wait for failure detection + time.Sleep(500 * time.Millisecond) + + // Force remove the non-leader (transition to left state) + if err := servers[1].RemoveFailedNode(servers[0].config.NodeName); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for intent propagation + time.Sleep(500 * time.Millisecond) + + // Wait until we have 2 peers + start = time.Now() +CHECK2: + for _, s := range servers[1:] { + peers, _ := s.raftPeers.Peers() + if len(peers) != 2 { + if time.Now().Sub(start) >= 2*time.Second { + t.Fatalf("should have 2 peers") + } else { + time.Sleep(100 * time.Millisecond) + goto CHECK2 + } + } + } +} diff --git a/consul/serf.go b/consul/serf.go index 28dcff4f8f..26fd2c475e 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -1,10 +1,8 @@ package consul import ( - "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" "net" - "time" ) // lanEventHandler is used to handle events from the lan Serf cluster @@ -14,7 +12,6 @@ func (s *Server) lanEventHandler() { case e := <-s.eventChLAN: switch e.EventType() { case serf.EventMemberJoin: - s.localJoin(e.(serf.MemberEvent)) fallthrough case serf.EventMemberLeave: fallthrough @@ -62,26 +59,12 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) { return } - // Dispatch an async handler for each member + // Queue the members for reconciliation for _, m := range me.Members { - go s.reconcileMember(m) - } -} - -// localJoin is used to handle join events on the lan serf cluster -func (s *Server) localJoin(me serf.MemberEvent) { - // Check for consul members - for _, m := range me.Members { - ok, dc, port := isConsulServer(m) - if !ok { - continue + select { + case s.reconcileCh <- m: + default: } - if dc != s.config.Datacenter { - s.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster", - m.Name, dc) - continue - } - go s.joinConsulServer(m, port) } } @@ -147,60 +130,3 @@ func (s *Server) remoteFailed(me serf.MemberEvent) { s.remoteLock.Unlock() } } - -// joinConsulServer is used to try to join another consul server -func (s *Server) joinConsulServer(m serf.Member, port int) { - if m.Name == s.config.NodeName { - return - } - var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} - var future raft.Future - -CHECK: - // Get the Raft peers - peers, err := s.raftPeers.Peers() - if err != nil { - s.logger.Printf("[ERR] consul: failed to get raft peers: %v", err) - goto WAIT - } - - // Bail if this node is already a peer - for _, p := range peers { - if p.String() == addr.String() { - return - } - } - - // Bail if the node is not alive - if memberStatus(s.serfLAN.Members(), m.Name) != serf.StatusAlive { - return - } - - // Attempt to add as a peer - future = s.raft.AddPeer(addr) - if err := future.Error(); err != nil { - s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err) - } else { - return - } - -WAIT: - time.Sleep(500 * time.Millisecond) - select { - case <-s.shutdownCh: - return - default: - goto CHECK - } -} - -// memberStatus scans a list of members for a matching one, -// returning the status or StatusNone -func memberStatus(members []serf.Member, name string) serf.MemberStatus { - for _, m := range members { - if m.Name == name { - return m.Status - } - } - return serf.StatusNone -} diff --git a/consul/server.go b/consul/server.go index fc0aa55254..e158580c76 100644 --- a/consul/server.go +++ b/consul/server.go @@ -53,6 +53,11 @@ type Server struct { raftStore *raft.MDBStore raftTransport *raft.NetworkTransport + // reconcileCh is used to pass events from the serf handler + // into the leader manager, so that the strong state can be + // updated + reconcileCh chan serf.Member + // remoteConsuls is used to track the known consuls in // remote data centers. Used to do DC forwarding. remoteConsuls map[string][]net.Addr @@ -110,6 +115,7 @@ func NewServer(config *Config) (*Server, error) { eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), logger: logger, + reconcileCh: make(chan serf.Member, 32), remoteConsuls: make(map[string][]net.Addr), rpcClients: make(map[net.Conn]struct{}), rpcServer: rpc.NewServer(), @@ -202,7 +208,7 @@ func (s *Server) setupRaft() error { } // Create a transport layer - trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput) + trans := raft.NewNetworkTransport(s.raftLayer, 3, 500*time.Millisecond, s.config.LogOutput) s.raftTransport = trans // Setup the peer store diff --git a/consul/server_test.go b/consul/server_test.go index fdff0f4627..6c8151c62d 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -64,7 +64,7 @@ func testServerDCBootstrap(t *testing.T, dc string, bootstrap bool) (string, *Se config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond config.RaftConfig.ElectionTimeout = 40 * time.Millisecond - config.ReconcileInterval = 50 * time.Millisecond + config.ReconcileInterval = 100 * time.Millisecond server, err := NewServer(config) if err != nil {