mirror of https://github.com/status-im/consul.git
Add rpc_listener option to segment config
This commit is contained in:
parent
a30e7657af
commit
2ada0439d4
|
@ -648,30 +648,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
|
|||
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
|
||||
}
|
||||
base.Segment = a.config.Segment
|
||||
for _, segment := range a.config.Segments {
|
||||
config := consul.DefaultConfig().SerfLANConfig
|
||||
|
||||
config.MemberlistConfig.AdvertiseAddr = segment.Advertise
|
||||
config.MemberlistConfig.AdvertisePort = segment.Port
|
||||
config.MemberlistConfig.BindAddr = segment.Bind
|
||||
config.MemberlistConfig.BindPort = segment.Port
|
||||
if a.config.ReconnectTimeoutLan != 0 {
|
||||
config.ReconnectTimeout = a.config.ReconnectTimeoutLan
|
||||
if len(a.config.Segments) > 0 {
|
||||
segments, err := a.segmentConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if a.config.EncryptVerifyIncoming != nil {
|
||||
config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming
|
||||
}
|
||||
if a.config.EncryptVerifyOutgoing != nil {
|
||||
config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing
|
||||
}
|
||||
|
||||
base.Segments = append(base.Segments, consul.NetworkSegment{
|
||||
Name: segment.Name,
|
||||
Bind: segment.Bind,
|
||||
Port: segment.Port,
|
||||
Advertise: segment.Advertise,
|
||||
SerfConfig: config,
|
||||
})
|
||||
base.Segments = segments
|
||||
}
|
||||
if a.config.Bootstrap {
|
||||
base.Bootstrap = true
|
||||
|
@ -789,6 +771,49 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
|
|||
return base, nil
|
||||
}
|
||||
|
||||
// Setup the serf and memberlist config for any defined network segments.
|
||||
func (a *Agent) segmentConfig() ([]consul.NetworkSegment, error) {
|
||||
var segments []consul.NetworkSegment
|
||||
config := a.config
|
||||
|
||||
for _, segment := range config.Segments {
|
||||
serfConf := consul.DefaultConfig().SerfLANConfig
|
||||
|
||||
serfConf.MemberlistConfig.AdvertiseAddr = segment.Advertise
|
||||
serfConf.MemberlistConfig.AdvertisePort = segment.Port
|
||||
serfConf.MemberlistConfig.BindAddr = segment.Bind
|
||||
serfConf.MemberlistConfig.BindPort = segment.Port
|
||||
if config.ReconnectTimeoutLan != 0 {
|
||||
serfConf.ReconnectTimeout = config.ReconnectTimeoutLan
|
||||
}
|
||||
if config.EncryptVerifyIncoming != nil {
|
||||
serfConf.MemberlistConfig.GossipVerifyIncoming = *config.EncryptVerifyIncoming
|
||||
}
|
||||
if config.EncryptVerifyOutgoing != nil {
|
||||
serfConf.MemberlistConfig.GossipVerifyOutgoing = *config.EncryptVerifyOutgoing
|
||||
}
|
||||
|
||||
var rpcAddr *net.TCPAddr
|
||||
if segment.RPCListener {
|
||||
rpcAddr = &net.TCPAddr{
|
||||
IP: net.ParseIP(segment.Bind),
|
||||
Port: a.config.Ports.Server,
|
||||
}
|
||||
}
|
||||
|
||||
segments = append(segments, consul.NetworkSegment{
|
||||
Name: segment.Name,
|
||||
Bind: segment.Bind,
|
||||
Port: segment.Port,
|
||||
Advertise: segment.Advertise,
|
||||
RPCAddr: rpcAddr,
|
||||
SerfConfig: serfConf,
|
||||
})
|
||||
}
|
||||
|
||||
return segments, nil
|
||||
}
|
||||
|
||||
// makeRandomID will generate a random UUID for a node.
|
||||
func (a *Agent) makeRandomID() (string, error) {
|
||||
id, err := uuid.GenerateUUID()
|
||||
|
|
|
@ -357,11 +357,16 @@ type NetworkSegment struct {
|
|||
Name string `mapstructure:"name"`
|
||||
|
||||
// Bind is the bind address for this segment.
|
||||
Bind string `mapstructure:"bind"`
|
||||
Bind string `mapstructure:"bind"`
|
||||
BindAddrs []string `mapstructure:"-"`
|
||||
|
||||
// Port is the port for this segment.
|
||||
Port int `mapstructure:"port"`
|
||||
|
||||
// RPCListener is whether to bind a separate RPC listener on the bind address
|
||||
// for this segment.
|
||||
RPCListener bool `mapstructure:"rpc_listener"`
|
||||
|
||||
// Advertise is the advertise address of this segment.
|
||||
Advertise string `mapstructure:"advertise"`
|
||||
}
|
||||
|
@ -1408,6 +1413,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
|||
result.AdvertiseAddrs.RPC = addr
|
||||
}
|
||||
|
||||
// Validate segment config.
|
||||
if err := ValidateSegments(&result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Enforce the max Raft multiplier.
|
||||
if result.Performance.RaftMultiplier > consul.MaxRaftMultiplier {
|
||||
return nil, fmt.Errorf("Performance.RaftMultiplier must be <= %d", consul.MaxRaftMultiplier)
|
||||
|
@ -1461,31 +1471,25 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
|||
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
|
||||
}
|
||||
|
||||
// Validate segment config
|
||||
if err := ValidateSegments(&result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func ValidateSegments(conf *Config) error {
|
||||
if conf.Server && conf.Segment != "" {
|
||||
return fmt.Errorf("Segment option can only be set on clients")
|
||||
}
|
||||
|
||||
if !conf.Server && len(conf.Segments) > 0 {
|
||||
return fmt.Errorf("Cannot define segments on clients")
|
||||
}
|
||||
|
||||
if len(conf.Segments) > SegmentLimit {
|
||||
return fmt.Errorf("Cannot exceed network segment limit of %d", SegmentLimit)
|
||||
}
|
||||
|
||||
takenPorts := make(map[int]string, len(conf.Segments))
|
||||
for _, segment := range conf.Segments {
|
||||
if len(segment.Name) > SegmentNameLimit {
|
||||
return fmt.Errorf("Segment name %q exceeds maximum length of %d", segment.Name, SegmentNameLimit)
|
||||
}
|
||||
|
||||
previous, ok := takenPorts[segment.Port]
|
||||
if ok {
|
||||
return fmt.Errorf("Segment %q port %d overlaps with segment %q", segment.Name, segment.Port, previous)
|
||||
}
|
||||
takenPorts[segment.Port] = segment.Name
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -2275,7 +2279,6 @@ func (c *Config) ResolveTmplAddrs() (err error) {
|
|||
for i, segment := range c.Segments {
|
||||
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
|
||||
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -597,8 +597,15 @@ func TestDecodeConfig(t *testing.T) {
|
|||
c: &Config{Segment: "thing"},
|
||||
},
|
||||
{
|
||||
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`,
|
||||
c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}},
|
||||
in: `{"server": true, "segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "rpc_listener": true, "advertise": "1.1.1.1"}]}`,
|
||||
c: &Config{Server: true, Segments: []NetworkSegment{{
|
||||
Name: "alpha",
|
||||
Bind: "127.0.0.1",
|
||||
BindAddrs: []string{"127.0.0.1"},
|
||||
Port: 1234,
|
||||
RPCListener: true,
|
||||
Advertise: "1.1.1.1",
|
||||
}}},
|
||||
},
|
||||
{
|
||||
in: `{"serf_lan_bind":"1.2.3.4"}`,
|
||||
|
@ -1311,18 +1318,6 @@ func TestDecodeConfig_VerifyUniqueListeners(t *testing.T) {
|
|||
|
||||
func TestDecodeConfig_ValidateSegments(t *testing.T) {
|
||||
t.Parallel()
|
||||
serverWithSegment := &Config{Segment: "asfd", Server: true}
|
||||
if err := ValidateSegments(serverWithSegment); !strings.Contains(err.Error(), "can only be set on clients") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
clientWithSegments := &Config{
|
||||
Segments: []NetworkSegment{{Name: "asdf"}},
|
||||
}
|
||||
if err := ValidateSegments(clientWithSegments); !strings.Contains(err.Error(), "Cannot define segments on clients") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
tooManySegments := &Config{Server: true}
|
||||
for i := 0; i < SegmentLimit+1; i++ {
|
||||
tooManySegments.Segments = append(tooManySegments.Segments, NetworkSegment{})
|
||||
|
@ -1338,6 +1333,17 @@ func TestDecodeConfig_ValidateSegments(t *testing.T) {
|
|||
if err := ValidateSegments(segmentNameTooLong); !strings.Contains(err.Error(), "exceeds maximum length") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
duplicatePorts := &Config{
|
||||
Segments: []NetworkSegment{
|
||||
{Name: "asdf", Port: 1234},
|
||||
{Name: "qwer", Port: 1234},
|
||||
},
|
||||
Server: true,
|
||||
}
|
||||
if err := ValidateSegments(duplicatePorts); !strings.Contains(err.Error(), "port 1234 overlaps with segment \"asdf\"") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultConfig(t *testing.T) {
|
||||
|
|
|
@ -55,6 +55,7 @@ type NetworkSegment struct {
|
|||
Bind string
|
||||
Port int
|
||||
Advertise string
|
||||
RPCAddr *net.TCPAddr
|
||||
SerfConfig *serf.Config
|
||||
}
|
||||
|
||||
|
|
|
@ -46,10 +46,10 @@ const (
|
|||
)
|
||||
|
||||
// listen is used to listen for incoming RPC connections
|
||||
func (s *Server) listen() {
|
||||
func (s *Server) listen(listener net.Listener) {
|
||||
for {
|
||||
// Accept a connection
|
||||
conn, err := s.Listener.Accept()
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
if s.shutdown {
|
||||
return
|
||||
|
|
|
@ -167,6 +167,9 @@ type Server struct {
|
|||
segmentLAN map[string]*serf.Serf
|
||||
segmentLock sync.RWMutex
|
||||
|
||||
// segmentListeners holds the RPC listener for any segment with a separate listener.
|
||||
segmentListeners map[string]net.Listener
|
||||
|
||||
// serfWAN is the Serf cluster maintained between DC's
|
||||
// which SHOULD only consist of Consul servers
|
||||
serfWAN *serf.Serf
|
||||
|
@ -306,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
rpcTLS: incomingTLS,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
|
||||
segmentListeners: make(map[string]net.Listener),
|
||||
sessionTimers: NewSessionTimers(),
|
||||
tombstoneGC: gc,
|
||||
serverLookup: NewServerLookup(),
|
||||
|
@ -418,7 +422,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
|
|||
}
|
||||
|
||||
// Start listening for RPC requests.
|
||||
go s.listen()
|
||||
go s.listen(s.Listener)
|
||||
|
||||
// Start listeners for any segments with separate RPC listeners.
|
||||
for _, listener := range s.segmentListeners {
|
||||
go s.listen(listener)
|
||||
}
|
||||
|
||||
// Start the metrics handlers.
|
||||
go s.sessionStats()
|
||||
|
@ -645,6 +654,19 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
|||
return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise)
|
||||
}
|
||||
|
||||
for _, segment := range s.config.Segments {
|
||||
if segment.RPCAddr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
segmentListener, err := net.ListenTCP("tcp", segment.RPCAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.segmentListeners[segment.Name] = segmentListener
|
||||
}
|
||||
|
||||
// Provide a DC specific wrapper. Raft replication is only
|
||||
// ever done in the same datacenter, so we can provide it as a constant.
|
||||
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
|
||||
|
|
|
@ -53,6 +53,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
|
||||
conf.Tags["build"] = s.config.Build
|
||||
addr := s.Listener.Addr().(*net.TCPAddr)
|
||||
if listener, ok := s.segmentListeners[segment]; ok {
|
||||
addr = listener.Addr().(*net.TCPAddr)
|
||||
}
|
||||
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
|
||||
if s.config.Bootstrap {
|
||||
conf.Tags["bootstrap"] = "1"
|
||||
|
|
|
@ -465,7 +465,7 @@ func NewClient(config *Config) (*Client, error) {
|
|||
if config.Token == "" {
|
||||
config.Token = defConfig.Token
|
||||
}
|
||||
|
||||
|
||||
client := &Client{
|
||||
config: *config,
|
||||
}
|
||||
|
|
|
@ -394,6 +394,16 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
|
|||
return nil
|
||||
}
|
||||
|
||||
if cfg.Server && cfg.Segment != "" {
|
||||
cmd.UI.Error("Segment option can only be set on clients")
|
||||
return nil
|
||||
}
|
||||
|
||||
if !cfg.Server && len(cfg.Segments) > 0 {
|
||||
cmd.UI.Error("Cannot define segments on clients")
|
||||
return nil
|
||||
}
|
||||
|
||||
// patch deprecated retry-join-{gce,azure,ec2)-* parameters
|
||||
// into -retry-join and issue warning.
|
||||
// todo(fs): this should really be in DecodeConfig where it can be tested
|
||||
|
|
Loading…
Reference in New Issue