mirror of https://github.com/status-im/consul.git
consul: Adding tons of shit, leave test
This commit is contained in:
parent
c4a4b9df34
commit
207171f264
|
@ -30,6 +30,6 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error {
|
|||
r.server.logger.Printf("[ERR] Failed to parse peer: %v", err)
|
||||
return err
|
||||
}
|
||||
future := r.server.raft.AddPeer(peer)
|
||||
future := r.server.raft.RemovePeer(peer)
|
||||
return future.Error()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// lanEventHandler is used to handle events from the lan Serf cluster
|
||||
|
@ -54,6 +59,18 @@ func (s *Server) wanEventHandler() {
|
|||
|
||||
// localJoin is used to handle join events on the lan serf cluster
|
||||
func (s *Server) localJoin(me serf.MemberEvent) {
|
||||
// Check for consul members
|
||||
for _, m := range me.Members {
|
||||
ok, dc, port := s.isConsulServer(m)
|
||||
if ok {
|
||||
if dc != s.config.Datacenter {
|
||||
s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster",
|
||||
m.Name, dc)
|
||||
return
|
||||
}
|
||||
go s.joinConsulServer(m, port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// localLeave is used to handle leave events on the lan serf cluster
|
||||
|
@ -83,3 +100,59 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
|
|||
// remoteEvent is used to handle events on the wan serf cluster
|
||||
func (s *Server) remoteEvent(ue serf.UserEvent) {
|
||||
}
|
||||
|
||||
// Returns if a member is a consul server. Returns a bool,
|
||||
// the data center, and the rpc port
|
||||
func (s *Server) isConsulServer(m serf.Member) (bool, string, int) {
|
||||
role := m.Role
|
||||
if !strings.HasPrefix(role, "consul:") {
|
||||
return false, "", 0
|
||||
}
|
||||
|
||||
parts := strings.SplitN(role, ":", 3)
|
||||
datacenter := parts[1]
|
||||
port_str := parts[2]
|
||||
port, err := strconv.Atoi(port_str)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] Failed to parse role: %s", role)
|
||||
return false, "", 0
|
||||
}
|
||||
|
||||
return true, datacenter, port
|
||||
}
|
||||
|
||||
// joinConsulServer is used to try to join another consul server
|
||||
func (s *Server) joinConsulServer(m serf.Member, port int) {
|
||||
if m.Name == s.config.NodeName {
|
||||
return
|
||||
}
|
||||
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
|
||||
var future raft.Future
|
||||
|
||||
CHECK:
|
||||
// Get the Raft peers
|
||||
peers, err := s.raftPeers.Peers()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] Failed to get raft peers: %v", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Bail if this node is already a peer
|
||||
for _, p := range peers {
|
||||
if p.String() == addr.String() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to add as a peer
|
||||
future = s.raft.AddPeer(addr)
|
||||
if err := future.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] Failed to add raft peer: %v", err)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
WAIT:
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
goto CHECK
|
||||
}
|
||||
|
|
|
@ -145,8 +145,9 @@ func (s *Server) ensurePath(path string, dir bool) error {
|
|||
|
||||
// setupSerf is used to setup and initialize a Serf
|
||||
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
|
||||
addr := s.rpcListener.Addr().(*net.TCPAddr)
|
||||
conf.NodeName = s.config.NodeName
|
||||
conf.Role = fmt.Sprintf("consul:%s:%s", s.config.Datacenter, s.rpcListener.Addr().String())
|
||||
conf.Role = fmt.Sprintf("consul:%s:%d", s.config.Datacenter, addr.Port)
|
||||
conf.MemberlistConfig.LogOutput = s.config.LogOutput
|
||||
conf.LogOutput = s.config.LogOutput
|
||||
conf.EventCh = ch
|
||||
|
@ -231,12 +232,10 @@ func (s *Server) Shutdown() error {
|
|||
|
||||
if s.serfLAN != nil {
|
||||
s.serfLAN.Shutdown()
|
||||
s.serfLAN = nil
|
||||
}
|
||||
|
||||
if s.serfWAN != nil {
|
||||
s.serfWAN.Shutdown()
|
||||
s.serfWAN = nil
|
||||
}
|
||||
|
||||
if s.raft != nil {
|
||||
|
@ -244,14 +243,10 @@ func (s *Server) Shutdown() error {
|
|||
s.raftLayer.Close()
|
||||
s.raft.Shutdown()
|
||||
s.raftStore.Close()
|
||||
s.raft = nil
|
||||
s.raftLayer = nil
|
||||
s.raftStore = nil
|
||||
}
|
||||
|
||||
if s.rpcListener != nil {
|
||||
s.rpcListener.Close()
|
||||
s.rpcListener = nil
|
||||
}
|
||||
|
||||
// Close all the RPC connections
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var nextPort = 15000
|
||||
|
@ -99,3 +100,43 @@ func TestServer_JoinWAN(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Leave(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
dir2, s2 := testServer(t)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfLANConfig.MemberlistConfig.Port)
|
||||
if err := s2.JoinLAN(addr); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
p1, _ := s1.raftPeers.Peers()
|
||||
if len(p1) != 2 {
|
||||
t.Fatalf("should have 2 peers: %v", p1)
|
||||
}
|
||||
|
||||
p2, _ := s2.raftPeers.Peers()
|
||||
if len(p2) != 2 {
|
||||
t.Fatalf("should have 2 peers: %v", p2)
|
||||
}
|
||||
|
||||
// Issue a leave
|
||||
if err := s2.Leave(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should lose a peer
|
||||
p1, _ = s1.raftPeers.Peers()
|
||||
if len(p1) != 1 {
|
||||
t.Fatalf("should have 1 peer: %v", p1)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue