diff --git a/consul/raft_endpoint.go b/consul/raft_endpoint.go index 96a6602012..71bc82d487 100644 --- a/consul/raft_endpoint.go +++ b/consul/raft_endpoint.go @@ -30,6 +30,6 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error { r.server.logger.Printf("[ERR] Failed to parse peer: %v", err) return err } - future := r.server.raft.AddPeer(peer) + future := r.server.raft.RemovePeer(peer) return future.Error() } diff --git a/consul/serf.go b/consul/serf.go index 7aedd2be4f..d5f6d81971 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -1,7 +1,12 @@ package consul import ( + "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "net" + "strconv" + "strings" + "time" ) // lanEventHandler is used to handle events from the lan Serf cluster @@ -54,6 +59,18 @@ func (s *Server) wanEventHandler() { // localJoin is used to handle join events on the lan serf cluster func (s *Server) localJoin(me serf.MemberEvent) { + // Check for consul members + for _, m := range me.Members { + ok, dc, port := s.isConsulServer(m) + if ok { + if dc != s.config.Datacenter { + s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster", + m.Name, dc) + return + } + go s.joinConsulServer(m, port) + } + } } // localLeave is used to handle leave events on the lan serf cluster @@ -83,3 +100,59 @@ func (s *Server) remoteFailed(me serf.MemberEvent) { // remoteEvent is used to handle events on the wan serf cluster func (s *Server) remoteEvent(ue serf.UserEvent) { } + +// Returns if a member is a consul server. Returns a bool, +// the data center, and the rpc port +func (s *Server) isConsulServer(m serf.Member) (bool, string, int) { + role := m.Role + if !strings.HasPrefix(role, "consul:") { + return false, "", 0 + } + + parts := strings.SplitN(role, ":", 3) + datacenter := parts[1] + port_str := parts[2] + port, err := strconv.Atoi(port_str) + if err != nil { + s.logger.Printf("[ERR] Failed to parse role: %s", role) + return false, "", 0 + } + + return true, datacenter, port +} + +// joinConsulServer is used to try to join another consul server +func (s *Server) joinConsulServer(m serf.Member, port int) { + if m.Name == s.config.NodeName { + return + } + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + var future raft.Future + +CHECK: + // Get the Raft peers + peers, err := s.raftPeers.Peers() + if err != nil { + s.logger.Printf("[ERR] Failed to get raft peers: %v", err) + goto WAIT + } + + // Bail if this node is already a peer + for _, p := range peers { + if p.String() == addr.String() { + return + } + } + + // Attempt to add as a peer + future = s.raft.AddPeer(addr) + if err := future.Error(); err != nil { + s.logger.Printf("[ERR] Failed to add raft peer: %v", err) + } else { + return + } + +WAIT: + time.Sleep(500 * time.Millisecond) + goto CHECK +} diff --git a/consul/server.go b/consul/server.go index 7a70d14faa..b194569391 100644 --- a/consul/server.go +++ b/consul/server.go @@ -145,8 +145,9 @@ func (s *Server) ensurePath(path string, dir bool) error { // setupSerf is used to setup and initialize a Serf func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { + addr := s.rpcListener.Addr().(*net.TCPAddr) conf.NodeName = s.config.NodeName - conf.Role = fmt.Sprintf("consul:%s:%s", s.config.Datacenter, s.rpcListener.Addr().String()) + conf.Role = fmt.Sprintf("consul:%s:%d", s.config.Datacenter, addr.Port) conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput conf.EventCh = ch @@ -231,12 +232,10 @@ func (s *Server) Shutdown() error { if s.serfLAN != nil { s.serfLAN.Shutdown() - s.serfLAN = nil } if s.serfWAN != nil { s.serfWAN.Shutdown() - s.serfWAN = nil } if s.raft != nil { @@ -244,14 +243,10 @@ func (s *Server) Shutdown() error { s.raftLayer.Close() s.raft.Shutdown() s.raftStore.Close() - s.raft = nil - s.raftLayer = nil - s.raftStore = nil } if s.rpcListener != nil { s.rpcListener.Close() - s.rpcListener = nil } // Close all the RPC connections diff --git a/consul/server_test.go b/consul/server_test.go index 76ed9b1727..7e40be6773 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "testing" + "time" ) var nextPort = 15000 @@ -99,3 +100,43 @@ func TestServer_JoinWAN(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestServer_Leave(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.Port) + if err := s2.JoinLAN(addr); err != nil { + t.Fatalf("err: %v", err) + } + + time.Sleep(time.Second) + + p1, _ := s1.raftPeers.Peers() + if len(p1) != 2 { + t.Fatalf("should have 2 peers: %v", p1) + } + + p2, _ := s2.raftPeers.Peers() + if len(p2) != 2 { + t.Fatalf("should have 2 peers: %v", p2) + } + + // Issue a leave + if err := s2.Leave(); err != nil { + t.Fatalf("err: %v", err) + } + + // Should lose a peer + p1, _ = s1.raftPeers.Peers() + if len(p1) != 1 { + t.Fatalf("should have 1 peer: %v", p1) + } +}