From d4476e3df68546ee222a4ba7c06ade24e156b781 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 11 Dec 2013 16:24:34 -0800 Subject: [PATCH] Track remote consul servers --- consul/catalog_endpoint.go | 1 - consul/serf.go | 77 ++++++++++++++++++++++++++++++-------- consul/server.go | 22 +++++++---- consul/server_test.go | 18 ++++++++- 4 files changed, 93 insertions(+), 25 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 285015483d..89cee110fb 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -12,7 +12,6 @@ type Catalog struct { /* * Register : Registers that a node provides a given service * Deregister : Deregisters that a node provides a given service -* RemoveNode: Used to force remove a node * ListDatacenters: List the known datacenters * ListServices : Lists the available services diff --git a/consul/serf.go b/consul/serf.go index ff832299aa..b1cb828aec 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -42,11 +42,10 @@ func (s *Server) wanEventHandler() { case serf.EventMemberJoin: s.remoteJoin(e.(serf.MemberEvent)) case serf.EventMemberLeave: - s.remoteLeave(e.(serf.MemberEvent)) + fallthrough case serf.EventMemberFailed: s.remoteFailed(e.(serf.MemberEvent)) case serf.EventUser: - s.remoteEvent(e.(serf.UserEvent)) default: s.logger.Printf("[WARN] Unhandled LAN Serf Event: %#v", e) } @@ -62,14 +61,15 @@ func (s *Server) localJoin(me serf.MemberEvent) { // Check for consul members for _, m := range me.Members { ok, dc, port := s.isConsulServer(m) - if ok { - if dc != s.config.Datacenter { - s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster", - m.Name, dc) - return - } - go s.joinConsulServer(m, port) + if !ok { + continue } + if dc != s.config.Datacenter { + s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster", + m.Name, dc) + continue + } + go s.joinConsulServer(m, port) } } @@ -87,18 +87,65 @@ func (s *Server) localEvent(ue serf.UserEvent) { // remoteJoin is used to handle join events on the wan serf cluster func (s *Server) remoteJoin(me serf.MemberEvent) { -} + for _, m := range me.Members { + ok, dc, port := s.isConsulServer(m) + if !ok { + s.logger.Printf("[WARN] Non-Consul server in WAN pool: %s %s", m.Name) + continue + } + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + s.logger.Printf("[INFO] Adding Consul server (Datacenter: %s) (Addr: %s)", dc, addr) -// remoteLeave is used to handle leave events on the wan serf cluster -func (s *Server) remoteLeave(me serf.MemberEvent) { + // Check if this server is known + found := false + s.remoteLock.Lock() + existing := s.remoteConsuls[dc] + for _, e := range existing { + if e.String() == addr.String() { + found = true + break + } + } + + // Add ot the list if not known + if !found { + s.remoteConsuls[dc] = append(existing, addr) + } + s.remoteLock.Unlock() + } } // remoteFailed is used to handle fail events on the wan serf cluster func (s *Server) remoteFailed(me serf.MemberEvent) { -} + for _, m := range me.Members { + ok, dc, port := s.isConsulServer(m) + if !ok { + continue + } + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + s.logger.Printf("[INFO] Removing Consul server (Datacenter: %s) (Addr: %s)", dc, addr) -// remoteEvent is used to handle events on the wan serf cluster -func (s *Server) remoteEvent(ue serf.UserEvent) { + // Remove the server if known + s.remoteLock.Lock() + existing := s.remoteConsuls[dc] + n := len(existing) + for i := 0; i < n; i++ { + if existing[i].String() == addr.String() { + existing[i], existing[n-1] = existing[n-1], nil + existing = existing[:n-1] + n-- + break + } + } + + // Trim the list if all known consuls are dead + if n == 0 { + delete(s.remoteConsuls, dc) + } else { + s.remoteConsuls[dc] = existing + } + s.remoteLock.Unlock() + } } // Returns if a member is a consul server. Returns a bool, diff --git a/consul/server.go b/consul/server.go index 6d9a50eb35..33034c1956 100644 --- a/consul/server.go +++ b/consul/server.go @@ -50,6 +50,11 @@ type Server struct { raftStore *raft.SQLiteStore raftTransport *raft.NetworkTransport + // remoteConsuls is used to track the known consuls in + // remote data centers. Used to do DC forwarding. + remoteConsuls map[string][]net.Addr + remoteLock sync.RWMutex + // rpcClients is used to track active clients rpcClients map[net.Conn]struct{} rpcClientLock sync.Mutex @@ -89,14 +94,15 @@ func NewServer(config *Config) (*Server, error) { // Create server s := &Server{ - config: config, - connPool: NewPool(5), - eventChLAN: make(chan serf.Event, 256), - eventChWAN: make(chan serf.Event, 256), - logger: logger, - rpcClients: make(map[net.Conn]struct{}), - rpcServer: rpc.NewServer(), - shutdownCh: make(chan struct{}), + config: config, + connPool: NewPool(5), + eventChLAN: make(chan serf.Event, 256), + eventChWAN: make(chan serf.Event, 256), + logger: logger, + remoteConsuls: make(map[string][]net.Addr), + rpcClients: make(map[net.Conn]struct{}), + rpcServer: rpc.NewServer(), + shutdownCh: make(chan struct{}), } // Initialize the RPC layer diff --git a/consul/server_test.go b/consul/server_test.go index 2f1e0edd48..1d26a9ffa4 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -25,8 +25,13 @@ func tmpDir(t *testing.T) string { } func testServer(t *testing.T) (string, *Server) { + return testServerDC(t, "dc1") +} + +func testServerDC(t *testing.T, dc string) (string, *Server) { dir := tmpDir(t) config := DefaultConfig() + config.Datacenter = dc config.DataDir = dir // Adjust the ports @@ -108,7 +113,7 @@ func TestServer_JoinWAN(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - dir2, s2 := testServer(t) + dir2, s2 := testServerDC(t, "dc2") defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -127,6 +132,17 @@ func TestServer_JoinWAN(t *testing.T) { if len(s2.WANMembers()) != 2 { t.Fatalf("bad len") } + + time.Sleep(10 * time.Millisecond) + + // Check the remoteConsuls has both + if len(s1.remoteConsuls) != 2 { + t.Fatalf("remote consul missing") + } + + if len(s2.remoteConsuls) != 2 { + t.Fatalf("remote consul missing") + } } func TestServer_Leave(t *testing.T) {