rpc: discover serf wan port before starting serf lan

When using dynamic ports for the serf clusters then
the actual bind port of the serf WAN cluster needs to
be discovered before the serf LAN cluster is started
since the serf LAN cluster announces the port of the WAN
cluster.
This commit is contained in:
Frank Schroeder 2017-06-27 10:55:55 +02:00 committed by Frank Schröder
parent 53eab7e970
commit c8ef588d8d

View File

@ -336,23 +336,67 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
// Serf and dynamic bind ports
//
// The LAN serf cluster announces the port of the WAN serf cluster
// which creates a race when the WAN cluster is supposed to bind to
// a dynamic port (port 0).
//
// The current memberlist implementation updates the BindPort field
// to the actual port of the TCP listener after startup if the
// BindPort is zero. However, this is not guarded by a lock and
// since BindPort is used for both UDP and TCP the actual ports for
// both protocols most certainly differ.
//
// In production deployments the memberlist port should not be set
// to zero since the node needs to be discoverable by others and the
// shared port number enables this and allows for consistent
// firewall rules.
//
// Therefore, BindPort zero is used solely for testing where lots of
// separate memberlist clusters are running concurrently on the same
// machine and on individual ports where they can reach each other via
// TCP.
//
// This leaves the data race on the config.BindPort field. To
// mitigate (not solve) the problem without refactoring the
// memberlist networking code we first store the bind port value in
// a local variable and if zero compare it with the value of the
// config until it is different. This is still racy but should work
// in the cases we care about.
// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// see big comment above why we are doing this.
if serfBindPortWAN == 0 {
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN != 0 {
break
}
time.Sleep(time.Millisecond)
}
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
// Initialize the LAN Serf.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
s.eventChLAN, serfLANSnapshot, false)
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
go s.lanEventHandler()
// Initialize the WAN Serf.
s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
s.eventChWAN, serfWANSnapshot, true)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
s.Shutdown()
@ -391,14 +435,14 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool) (*serf.Serf, error) {
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int) (*serf.Serf, error) {
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", s.config.SerfWANConfig.MemberlistConfig.BindPort)
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter