consul: Add+test JoinLAN/JoinWAN

This commit is contained in:
Armon Dadgar 2013-12-06 17:18:09 -08:00
parent 41b3ca7da9
commit 9e8cdb6c7a
4 changed files with 100 additions and 18 deletions

View File

@ -36,11 +36,11 @@ type Config struct {
// by the WAN and LAN // by the WAN and LAN
RPCAddr string RPCAddr string
// SerfLocalConfig is the configuration for the local serf // SerfLANConfig is the configuration for the intra-dc serf
SerfLocalConfig *serf.Config SerfLANConfig *serf.Config
// SerfRemoteConfig is the configuration for the remtoe serf // SerfWANConfig is the configuration for the cross-dc serf
SerfRemoteConfig *serf.Config SerfWANConfig *serf.Config
// LogOutput is the location to write logs to. If this is not set, // LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr. // logs will go to stderr.
@ -55,22 +55,22 @@ func DefaultConfig() *Config {
} }
conf := &Config{ conf := &Config{
Datacenter: "dc1", Datacenter: "dc1",
NodeName: hostname, NodeName: hostname,
RaftBindAddr: DefaultRaftAddr, RaftBindAddr: DefaultRaftAddr,
RPCAddr: DefaultRPCAddr, RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(), RaftConfig: raft.DefaultConfig(),
SerfLocalConfig: serf.DefaultConfig(), SerfLANConfig: serf.DefaultConfig(),
SerfRemoteConfig: serf.DefaultConfig(), SerfWANConfig: serf.DefaultConfig(),
} }
// Remote Serf should use the WAN timing, since we are using it // WAN Serf should use the WAN timing, since we are using it
// to communicate between DC's // to communicate between DC's
conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig() conf.SerfWANConfig.MemberlistConfig = memberlist.DefaultWANConfig()
// Ensure we don't have port conflicts // Ensure we don't have port conflicts
conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort conf.SerfLANConfig.MemberlistConfig.Port = DefaultLANSerfPort
conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort conf.SerfWANConfig.MemberlistConfig.Port = DefaultWANSerfPort
return conf return conf
} }

View File

@ -1,4 +1,4 @@
package connsul package consul
// lanEventHandler is used to handle events from the lan Serf cluster // lanEventHandler is used to handle events from the lan Serf cluster
func (s *Server) lanEventHandler() { func (s *Server) lanEventHandler() {

View File

@ -99,7 +99,7 @@ func NewServer(config *Config) (*Server, error) {
// Initialize the lan Serf // Initialize the lan Serf
var err error var err error
s.serfLAN, err = s.setupSerf(config.SerfLocalConfig, s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
s.eventChLAN, serfLANSnapshot) s.eventChLAN, serfLANSnapshot)
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
@ -107,7 +107,7 @@ func NewServer(config *Config) (*Server, error) {
} }
// Initialize the wan Serf // Initialize the wan Serf
s.serfWAN, err = s.setupSerf(config.SerfRemoteConfig, s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
s.eventChWAN, serfWANSnapshot) s.eventChWAN, serfWANSnapshot)
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
@ -251,3 +251,19 @@ func (s *Server) Shutdown() error {
return nil return nil
} }
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (s *Server) JoinLAN(addr string) error {
_, err := s.serfLAN.Join([]string{addr}, false)
return err
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addr string) error {
_, err := s.serfWAN.Join([]string{addr}, false)
return err
}

View File

@ -1,11 +1,20 @@
package consul package consul
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
) )
var nextPort = 15000
func getPort() int {
p := nextPort
nextPort++
return p
}
func tmpDir(t *testing.T) string { func tmpDir(t *testing.T) string {
dir, err := ioutil.TempDir("", "consul") dir, err := ioutil.TempDir("", "consul")
if err != nil { if err != nil {
@ -14,6 +23,28 @@ func tmpDir(t *testing.T) string {
return dir return dir
} }
func testServer(t *testing.T) (string, *Server) {
dir := tmpDir(t)
config := DefaultConfig()
config.DataDir = dir
// Adjust the ports
p := getPort()
config.NodeName = fmt.Sprintf("Node %d", p)
config.RaftBindAddr = fmt.Sprintf("127.0.0.1:%d", p)
config.RPCAddr = fmt.Sprintf("127.0.0.1:%d", getPort())
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfLANConfig.MemberlistConfig.Port = getPort()
config.SerfWANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfWANConfig.MemberlistConfig.Port = getPort()
server, err := NewServer(config)
if err != nil {
t.Fatalf("err: %v", err)
}
return dir, server
}
func TestServer_StartStop(t *testing.T) { func TestServer_StartStop(t *testing.T) {
dir := tmpDir(t) dir := tmpDir(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -35,3 +66,38 @@ func TestServer_StartStop(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func TestServer_JoinLAN(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.Port)
if err := s2.JoinLAN(addr); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestServer_JoinWAN(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.Port)
if err := s2.JoinWAN(addr); err != nil {
t.Fatalf("err: %v", err)
}
t.Fatalf("fail")
}