From 9e8cdb6c7a8db6a7208fe37f175c0819b2e19d0e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 6 Dec 2013 17:18:09 -0800 Subject: [PATCH] consul: Add+test JoinLAN/JoinWAN --- consul/config.go | 30 ++++++++++---------- consul/serf.go | 2 +- consul/server.go | 20 +++++++++++-- consul/server_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 18 deletions(-) diff --git a/consul/config.go b/consul/config.go index acda4840ec..5ee7032004 100644 --- a/consul/config.go +++ b/consul/config.go @@ -36,11 +36,11 @@ type Config struct { // by the WAN and LAN RPCAddr string - // SerfLocalConfig is the configuration for the local serf - SerfLocalConfig *serf.Config + // SerfLANConfig is the configuration for the intra-dc serf + SerfLANConfig *serf.Config - // SerfRemoteConfig is the configuration for the remtoe serf - SerfRemoteConfig *serf.Config + // SerfWANConfig is the configuration for the cross-dc serf + SerfWANConfig *serf.Config // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. @@ -55,22 +55,22 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: "dc1", - NodeName: hostname, - RaftBindAddr: DefaultRaftAddr, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLocalConfig: serf.DefaultConfig(), - SerfRemoteConfig: serf.DefaultConfig(), + Datacenter: "dc1", + NodeName: hostname, + RaftBindAddr: DefaultRaftAddr, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), } - // Remote Serf should use the WAN timing, since we are using it + // WAN Serf should use the WAN timing, since we are using it // to communicate between DC's - conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig() + conf.SerfWANConfig.MemberlistConfig = memberlist.DefaultWANConfig() // Ensure we don't have port conflicts - conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort - conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort + conf.SerfLANConfig.MemberlistConfig.Port = DefaultLANSerfPort + conf.SerfWANConfig.MemberlistConfig.Port = DefaultWANSerfPort return conf } diff --git a/consul/serf.go b/consul/serf.go index 6ef6a14ada..64bfc0c877 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -1,4 +1,4 @@ -package connsul +package consul // lanEventHandler is used to handle events from the lan Serf cluster func (s *Server) lanEventHandler() { diff --git a/consul/server.go b/consul/server.go index 69e8262ce3..e1d3a9df72 100644 --- a/consul/server.go +++ b/consul/server.go @@ -99,7 +99,7 @@ func NewServer(config *Config) (*Server, error) { // Initialize the lan Serf var err error - s.serfLAN, err = s.setupSerf(config.SerfLocalConfig, + s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot) if err != nil { s.Shutdown() @@ -107,7 +107,7 @@ func NewServer(config *Config) (*Server, error) { } // Initialize the wan Serf - s.serfWAN, err = s.setupSerf(config.SerfRemoteConfig, + s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot) if err != nil { s.Shutdown() @@ -251,3 +251,19 @@ func (s *Server) Shutdown() error { return nil } + +// JoinLAN is used to have Consul join the inner-DC pool +// The target address should be another node inside the DC +// listening on the Serf LAN address +func (s *Server) JoinLAN(addr string) error { + _, err := s.serfLAN.Join([]string{addr}, false) + return err +} + +// JoinWAN is used to have Consul join the cross-WAN Consul ring +// The target address should be another node listening on the +// Serf WAN address +func (s *Server) JoinWAN(addr string) error { + _, err := s.serfWAN.Join([]string{addr}, false) + return err +} diff --git a/consul/server_test.go b/consul/server_test.go index 040ccbf0ef..4d13d73518 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -1,11 +1,20 @@ package consul import ( + "fmt" "io/ioutil" "os" "testing" ) +var nextPort = 15000 + +func getPort() int { + p := nextPort + nextPort++ + return p +} + func tmpDir(t *testing.T) string { dir, err := ioutil.TempDir("", "consul") if err != nil { @@ -14,6 +23,28 @@ func tmpDir(t *testing.T) string { return dir } +func testServer(t *testing.T) (string, *Server) { + dir := tmpDir(t) + config := DefaultConfig() + config.DataDir = dir + + // Adjust the ports + p := getPort() + config.NodeName = fmt.Sprintf("Node %d", p) + config.RaftBindAddr = fmt.Sprintf("127.0.0.1:%d", p) + config.RPCAddr = fmt.Sprintf("127.0.0.1:%d", getPort()) + config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1" + config.SerfLANConfig.MemberlistConfig.Port = getPort() + config.SerfWANConfig.MemberlistConfig.BindAddr = "127.0.0.1" + config.SerfWANConfig.MemberlistConfig.Port = getPort() + + server, err := NewServer(config) + if err != nil { + t.Fatalf("err: %v", err) + } + return dir, server +} + func TestServer_StartStop(t *testing.T) { dir := tmpDir(t) defer os.RemoveAll(dir) @@ -35,3 +66,38 @@ func TestServer_StartStop(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestServer_JoinLAN(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.Port) + if err := s2.JoinLAN(addr); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestServer_JoinWAN(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.Port) + if err := s2.JoinWAN(addr); err != nil { + t.Fatalf("err: %v", err) + } + t.Fatalf("fail") +}