diff --git a/agent/agent.go b/agent/agent.go index b0a59022db..071bdfd376 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -648,30 +648,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) { base.RPCAdvertise = a.config.AdvertiseAddrs.RPC } base.Segment = a.config.Segment - for _, segment := range a.config.Segments { - config := consul.DefaultConfig().SerfLANConfig - - config.MemberlistConfig.AdvertiseAddr = segment.Advertise - config.MemberlistConfig.AdvertisePort = segment.Port - config.MemberlistConfig.BindAddr = segment.Bind - config.MemberlistConfig.BindPort = segment.Port - if a.config.ReconnectTimeoutLan != 0 { - config.ReconnectTimeout = a.config.ReconnectTimeoutLan + if len(a.config.Segments) > 0 { + segments, err := a.segmentConfig() + if err != nil { + return nil, err } - if a.config.EncryptVerifyIncoming != nil { - config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming - } - if a.config.EncryptVerifyOutgoing != nil { - config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing - } - - base.Segments = append(base.Segments, consul.NetworkSegment{ - Name: segment.Name, - Bind: segment.Bind, - Port: segment.Port, - Advertise: segment.Advertise, - SerfConfig: config, - }) + base.Segments = segments } if a.config.Bootstrap { base.Bootstrap = true @@ -789,6 +771,49 @@ func (a *Agent) consulConfig() (*consul.Config, error) { return base, nil } +// Setup the serf and memberlist config for any defined network segments. +func (a *Agent) segmentConfig() ([]consul.NetworkSegment, error) { + var segments []consul.NetworkSegment + config := a.config + + for _, segment := range config.Segments { + serfConf := consul.DefaultConfig().SerfLANConfig + + serfConf.MemberlistConfig.AdvertiseAddr = segment.Advertise + serfConf.MemberlistConfig.AdvertisePort = segment.Port + serfConf.MemberlistConfig.BindAddr = segment.Bind + serfConf.MemberlistConfig.BindPort = segment.Port + if config.ReconnectTimeoutLan != 0 { + serfConf.ReconnectTimeout = config.ReconnectTimeoutLan + } + if config.EncryptVerifyIncoming != nil { + serfConf.MemberlistConfig.GossipVerifyIncoming = *config.EncryptVerifyIncoming + } + if config.EncryptVerifyOutgoing != nil { + serfConf.MemberlistConfig.GossipVerifyOutgoing = *config.EncryptVerifyOutgoing + } + + var rpcAddr *net.TCPAddr + if segment.RPCListener { + rpcAddr = &net.TCPAddr{ + IP: net.ParseIP(segment.Bind), + Port: a.config.Ports.Server, + } + } + + segments = append(segments, consul.NetworkSegment{ + Name: segment.Name, + Bind: segment.Bind, + Port: segment.Port, + Advertise: segment.Advertise, + RPCAddr: rpcAddr, + SerfConfig: serfConf, + }) + } + + return segments, nil +} + // makeRandomID will generate a random UUID for a node. func (a *Agent) makeRandomID() (string, error) { id, err := uuid.GenerateUUID() diff --git a/agent/config.go b/agent/config.go index 3a21adbc3a..0300effaa0 100644 --- a/agent/config.go +++ b/agent/config.go @@ -357,11 +357,16 @@ type NetworkSegment struct { Name string `mapstructure:"name"` // Bind is the bind address for this segment. - Bind string `mapstructure:"bind"` + Bind string `mapstructure:"bind"` + BindAddrs []string `mapstructure:"-"` // Port is the port for this segment. Port int `mapstructure:"port"` + // RPCListener is whether to bind a separate RPC listener on the bind address + // for this segment. + RPCListener bool `mapstructure:"rpc_listener"` + // Advertise is the advertise address of this segment. Advertise string `mapstructure:"advertise"` } @@ -1408,6 +1413,11 @@ func DecodeConfig(r io.Reader) (*Config, error) { result.AdvertiseAddrs.RPC = addr } + // Validate segment config. + if err := ValidateSegments(&result); err != nil { + return nil, err + } + // Enforce the max Raft multiplier. if result.Performance.RaftMultiplier > consul.MaxRaftMultiplier { return nil, fmt.Errorf("Performance.RaftMultiplier must be <= %d", consul.MaxRaftMultiplier) @@ -1461,31 +1471,25 @@ func DecodeConfig(r io.Reader) (*Config, error) { return nil, fmt.Errorf("Failed to parse node metadata: %v", err) } - // Validate segment config - if err := ValidateSegments(&result); err != nil { - return nil, err - } - return &result, nil } func ValidateSegments(conf *Config) error { - if conf.Server && conf.Segment != "" { - return fmt.Errorf("Segment option can only be set on clients") - } - - if !conf.Server && len(conf.Segments) > 0 { - return fmt.Errorf("Cannot define segments on clients") - } - if len(conf.Segments) > SegmentLimit { return fmt.Errorf("Cannot exceed network segment limit of %d", SegmentLimit) } + takenPorts := make(map[int]string, len(conf.Segments)) for _, segment := range conf.Segments { if len(segment.Name) > SegmentNameLimit { return fmt.Errorf("Segment name %q exceeds maximum length of %d", segment.Name, SegmentNameLimit) } + + previous, ok := takenPorts[segment.Port] + if ok { + return fmt.Errorf("Segment %q port %d overlaps with segment %q", segment.Name, segment.Port, previous) + } + takenPorts[segment.Port] = segment.Name } return nil @@ -2275,7 +2279,6 @@ func (c *Config) ResolveTmplAddrs() (err error) { for i, segment := range c.Segments { parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name)) parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name)) - } return diff --git a/agent/config_test.go b/agent/config_test.go index a6ad9af43e..a39a60ea3a 100644 --- a/agent/config_test.go +++ b/agent/config_test.go @@ -597,8 +597,15 @@ func TestDecodeConfig(t *testing.T) { c: &Config{Segment: "thing"}, }, { - in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`, - c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}}, + in: `{"server": true, "segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "rpc_listener": true, "advertise": "1.1.1.1"}]}`, + c: &Config{Server: true, Segments: []NetworkSegment{{ + Name: "alpha", + Bind: "127.0.0.1", + BindAddrs: []string{"127.0.0.1"}, + Port: 1234, + RPCListener: true, + Advertise: "1.1.1.1", + }}}, }, { in: `{"serf_lan_bind":"1.2.3.4"}`, @@ -1311,18 +1318,6 @@ func TestDecodeConfig_VerifyUniqueListeners(t *testing.T) { func TestDecodeConfig_ValidateSegments(t *testing.T) { t.Parallel() - serverWithSegment := &Config{Segment: "asfd", Server: true} - if err := ValidateSegments(serverWithSegment); !strings.Contains(err.Error(), "can only be set on clients") { - t.Fatalf("bad: %v", err) - } - - clientWithSegments := &Config{ - Segments: []NetworkSegment{{Name: "asdf"}}, - } - if err := ValidateSegments(clientWithSegments); !strings.Contains(err.Error(), "Cannot define segments on clients") { - t.Fatalf("bad: %v", err) - } - tooManySegments := &Config{Server: true} for i := 0; i < SegmentLimit+1; i++ { tooManySegments.Segments = append(tooManySegments.Segments, NetworkSegment{}) @@ -1338,6 +1333,17 @@ func TestDecodeConfig_ValidateSegments(t *testing.T) { if err := ValidateSegments(segmentNameTooLong); !strings.Contains(err.Error(), "exceeds maximum length") { t.Fatalf("bad: %v", err) } + + duplicatePorts := &Config{ + Segments: []NetworkSegment{ + {Name: "asdf", Port: 1234}, + {Name: "qwer", Port: 1234}, + }, + Server: true, + } + if err := ValidateSegments(duplicatePorts); !strings.Contains(err.Error(), "port 1234 overlaps with segment \"asdf\"") { + t.Fatalf("bad: %v", err) + } } func TestDefaultConfig(t *testing.T) { diff --git a/agent/consul/config.go b/agent/consul/config.go index c5d4d4501c..f7a7d72547 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -55,6 +55,7 @@ type NetworkSegment struct { Bind string Port int Advertise string + RPCAddr *net.TCPAddr SerfConfig *serf.Config } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 8afd09d301..b14fcb651d 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -46,10 +46,10 @@ const ( ) // listen is used to listen for incoming RPC connections -func (s *Server) listen() { +func (s *Server) listen(listener net.Listener) { for { // Accept a connection - conn, err := s.Listener.Accept() + conn, err := listener.Accept() if err != nil { if s.shutdown { return diff --git a/agent/consul/server.go b/agent/consul/server.go index b5b8e9beac..4bbb745aef 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -167,6 +167,9 @@ type Server struct { segmentLAN map[string]*serf.Serf segmentLock sync.RWMutex + // segmentListeners holds the RPC listener for any segment with a separate listener. + segmentListeners map[string]net.Listener + // serfWAN is the Serf cluster maintained between DC's // which SHOULD only consist of Consul servers serfWAN *serf.Serf @@ -306,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* rpcTLS: incomingTLS, reassertLeaderCh: make(chan chan error), segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), + segmentListeners: make(map[string]net.Listener), sessionTimers: NewSessionTimers(), tombstoneGC: gc, serverLookup: NewServerLookup(), @@ -418,7 +422,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* } // Start listening for RPC requests. - go s.listen() + go s.listen(s.Listener) + + // Start listeners for any segments with separate RPC listeners. + for _, listener := range s.segmentListeners { + go s.listen(listener) + } // Start the metrics handlers. go s.sessionStats() @@ -645,6 +654,19 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise) } + for _, segment := range s.config.Segments { + if segment.RPCAddr == nil { + continue + } + + segmentListener, err := net.ListenTCP("tcp", segment.RPCAddr) + if err != nil { + return err + } + + s.segmentListeners[segment.Name] = segmentListener + } + // Provide a DC specific wrapper. Raft replication is only // ever done in the same datacenter, so we can provide it as a constant. wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 1debd33c18..6f2805cc99 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -53,6 +53,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) conf.Tags["build"] = s.config.Build addr := s.Listener.Addr().(*net.TCPAddr) + if listener, ok := s.segmentListeners[segment]; ok { + addr = listener.Addr().(*net.TCPAddr) + } conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) if s.config.Bootstrap { conf.Tags["bootstrap"] = "1" diff --git a/api/api.go b/api/api.go index 78d2091d27..31efecca03 100644 --- a/api/api.go +++ b/api/api.go @@ -465,7 +465,7 @@ func NewClient(config *Config) (*Client, error) { if config.Token == "" { config.Token = defConfig.Token } - + client := &Client{ config: *config, } diff --git a/command/agent.go b/command/agent.go index d41534baff..f9c1c1fc92 100644 --- a/command/agent.go +++ b/command/agent.go @@ -394,6 +394,16 @@ func (cmd *AgentCommand) readConfig() *agent.Config { return nil } + if cfg.Server && cfg.Segment != "" { + cmd.UI.Error("Segment option can only be set on clients") + return nil + } + + if !cfg.Server && len(cfg.Segments) > 0 { + cmd.UI.Error("Cannot define segments on clients") + return nil + } + // patch deprecated retry-join-{gce,azure,ec2)-* parameters // into -retry-join and issue warning. // todo(fs): this should really be in DecodeConfig where it can be tested