Track remote consul servers

This commit is contained in:
Armon Dadgar 2013-12-11 16:24:34 -08:00
parent 7bc7d4cd4f
commit d4476e3df6
4 changed files with 93 additions and 25 deletions

View File

@ -12,7 +12,6 @@ type Catalog struct {
/*
* Register : Registers that a node provides a given service
* Deregister : Deregisters that a node provides a given service
* RemoveNode: Used to force remove a node
* ListDatacenters: List the known datacenters
* ListServices : Lists the available services

View File

@ -42,11 +42,10 @@ func (s *Server) wanEventHandler() {
case serf.EventMemberJoin:
s.remoteJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave:
s.remoteLeave(e.(serf.MemberEvent))
fallthrough
case serf.EventMemberFailed:
s.remoteFailed(e.(serf.MemberEvent))
case serf.EventUser:
s.remoteEvent(e.(serf.UserEvent))
default:
s.logger.Printf("[WARN] Unhandled LAN Serf Event: %#v", e)
}
@ -62,15 +61,16 @@ func (s *Server) localJoin(me serf.MemberEvent) {
// Check for consul members
for _, m := range me.Members {
ok, dc, port := s.isConsulServer(m)
if ok {
if !ok {
continue
}
if dc != s.config.Datacenter {
s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster",
m.Name, dc)
return
continue
}
go s.joinConsulServer(m, port)
}
}
}
// localLeave is used to handle leave events on the lan serf cluster
@ -87,18 +87,65 @@ func (s *Server) localEvent(ue serf.UserEvent) {
// remoteJoin is used to handle join events on the wan serf cluster
func (s *Server) remoteJoin(me serf.MemberEvent) {
}
for _, m := range me.Members {
ok, dc, port := s.isConsulServer(m)
if !ok {
s.logger.Printf("[WARN] Non-Consul server in WAN pool: %s %s", m.Name)
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
s.logger.Printf("[INFO] Adding Consul server (Datacenter: %s) (Addr: %s)", dc, addr)
// remoteLeave is used to handle leave events on the wan serf cluster
func (s *Server) remoteLeave(me serf.MemberEvent) {
// Check if this server is known
found := false
s.remoteLock.Lock()
existing := s.remoteConsuls[dc]
for _, e := range existing {
if e.String() == addr.String() {
found = true
break
}
}
// Add ot the list if not known
if !found {
s.remoteConsuls[dc] = append(existing, addr)
}
s.remoteLock.Unlock()
}
}
// remoteFailed is used to handle fail events on the wan serf cluster
func (s *Server) remoteFailed(me serf.MemberEvent) {
}
for _, m := range me.Members {
ok, dc, port := s.isConsulServer(m)
if !ok {
continue
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
s.logger.Printf("[INFO] Removing Consul server (Datacenter: %s) (Addr: %s)", dc, addr)
// remoteEvent is used to handle events on the wan serf cluster
func (s *Server) remoteEvent(ue serf.UserEvent) {
// Remove the server if known
s.remoteLock.Lock()
existing := s.remoteConsuls[dc]
n := len(existing)
for i := 0; i < n; i++ {
if existing[i].String() == addr.String() {
existing[i], existing[n-1] = existing[n-1], nil
existing = existing[:n-1]
n--
break
}
}
// Trim the list if all known consuls are dead
if n == 0 {
delete(s.remoteConsuls, dc)
} else {
s.remoteConsuls[dc] = existing
}
s.remoteLock.Unlock()
}
}
// Returns if a member is a consul server. Returns a bool,

View File

@ -50,6 +50,11 @@ type Server struct {
raftStore *raft.SQLiteStore
raftTransport *raft.NetworkTransport
// remoteConsuls is used to track the known consuls in
// remote data centers. Used to do DC forwarding.
remoteConsuls map[string][]net.Addr
remoteLock sync.RWMutex
// rpcClients is used to track active clients
rpcClients map[net.Conn]struct{}
rpcClientLock sync.Mutex
@ -94,6 +99,7 @@ func NewServer(config *Config) (*Server, error) {
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
remoteConsuls: make(map[string][]net.Addr),
rpcClients: make(map[net.Conn]struct{}),
rpcServer: rpc.NewServer(),
shutdownCh: make(chan struct{}),

View File

@ -25,8 +25,13 @@ func tmpDir(t *testing.T) string {
}
func testServer(t *testing.T) (string, *Server) {
return testServerDC(t, "dc1")
}
func testServerDC(t *testing.T, dc string) (string, *Server) {
dir := tmpDir(t)
config := DefaultConfig()
config.Datacenter = dc
config.DataDir = dir
// Adjust the ports
@ -108,7 +113,7 @@ func TestServer_JoinWAN(t *testing.T) {
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
@ -127,6 +132,17 @@ func TestServer_JoinWAN(t *testing.T) {
if len(s2.WANMembers()) != 2 {
t.Fatalf("bad len")
}
time.Sleep(10 * time.Millisecond)
// Check the remoteConsuls has both
if len(s1.remoteConsuls) != 2 {
t.Fatalf("remote consul missing")
}
if len(s2.remoteConsuls) != 2 {
t.Fatalf("remote consul missing")
}
}
func TestServer_Leave(t *testing.T) {