Organize segments for a cleaner split between enterprise and OSS

This commit is contained in:
Kyle Havlovitz 2017-08-31 17:39:46 -07:00
parent 00d03f1141
commit 62102a537e
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
13 changed files with 136 additions and 157 deletions

View File

@ -1474,31 +1474,6 @@ func DecodeConfig(r io.Reader) (*Config, error) {
return &result, nil return &result, nil
} }
func ValidateSegments(conf *Config) error {
if len(conf.Segments) > SegmentLimit {
return fmt.Errorf("Cannot exceed network segment limit of %d", SegmentLimit)
}
takenPorts := make(map[int]string, len(conf.Segments))
for _, segment := range conf.Segments {
if segment.Name == "" {
return fmt.Errorf("Segment name cannot be blank")
}
if len(segment.Name) > SegmentNameLimit {
return fmt.Errorf("Segment name %q exceeds maximum length of %d", segment.Name, SegmentNameLimit)
}
previous, ok := takenPorts[segment.Port]
if ok {
return fmt.Errorf("Segment %q port %d overlaps with segment %q", segment.Name, segment.Port, previous)
}
takenPorts[segment.Port] = segment.Name
}
return nil
}
// DecodeServiceDefinition is used to decode a service definition // DecodeServiceDefinition is used to decode a service definition
func DecodeServiceDefinition(raw interface{}) (*structs.ServiceDefinition, error) { func DecodeServiceDefinition(raw interface{}) (*structs.ServiceDefinition, error) {
rawMap, ok := raw.(map[string]interface{}) rawMap, ok := raw.(map[string]interface{})

View File

@ -592,20 +592,6 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`, in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123}, c: &Config{RetryMaxAttemptsWan: 123},
}, },
{
in: `{"segment":"thing"}`,
c: &Config{Segment: "thing"},
},
{
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "rpc_listener": true, "advertise": "1.1.1.1"}]}`,
c: &Config{Segments: []NetworkSegment{{
Name: "alpha",
Bind: "127.0.0.1",
Port: 1234,
RPCListener: true,
Advertise: "1.1.1.1",
}}},
},
{ {
in: `{"serf_lan_bind":"1.2.3.4"}`, in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"}, c: &Config{SerfLanBindAddr: "1.2.3.4"},
@ -1315,43 +1301,6 @@ func TestDecodeConfig_VerifyUniqueListeners(t *testing.T) {
} }
} }
func TestDecodeConfig_ValidateSegments(t *testing.T) {
t.Parallel()
tooManySegments := &Config{Server: true}
for i := 0; i < SegmentLimit+1; i++ {
tooManySegments.Segments = append(tooManySegments.Segments, NetworkSegment{})
}
if err := ValidateSegments(tooManySegments); !strings.Contains(err.Error(), "Cannot exceed network segment limit") {
t.Fatalf("bad: %v", err)
}
if err := ValidateSegments(&Config{
Segments: []NetworkSegment{{Name: ""}},
Server: true,
}); !strings.Contains(err.Error(), "Segment name cannot be blank") {
t.Fatalf("bad: %v", err)
}
segmentNameTooLong := &Config{
Segments: []NetworkSegment{{Name: strings.Repeat("a", SegmentNameLimit+1)}},
Server: true,
}
if err := ValidateSegments(segmentNameTooLong); !strings.Contains(err.Error(), "exceeds maximum length") {
t.Fatalf("bad: %v", err)
}
duplicatePorts := &Config{
Segments: []NetworkSegment{
{Name: "asdf", Port: 1234},
{Name: "qwer", Port: 1234},
},
Server: true,
}
if err := ValidateSegments(duplicatePorts); !strings.Contains(err.Error(), "port 1234 overlaps with segment \"asdf\"") {
t.Fatalf("bad: %v", err)
}
}
func TestDefaultConfig(t *testing.T) { func TestDefaultConfig(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -3,26 +3,19 @@
package consul package consul
import ( import (
"errors" "net"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
const (
errSegmentsNotSupported = "network segments are not supported in this version of Consul"
)
var (
ErrSegmentsNotSupported = errors.New(errSegmentsNotSupported)
)
// LANSegmentMembers is used to return the members of the given LAN segment. // LANSegmentMembers is used to return the members of the given LAN segment.
func (s *Server) LANSegmentMembers(segment string) ([]serf.Member, error) { func (s *Server) LANSegmentMembers(segment string) ([]serf.Member, error) {
if segment == "" { if segment == "" {
return s.LANMembers(), nil return s.LANMembers(), nil
} }
return nil, ErrSegmentsNotSupported return nil, structs.ErrSegmentsNotSupported
} }
// LANSegmentAddr is used to return the address used for the given LAN segment. // LANSegmentAddr is used to return the address used for the given LAN segment.
@ -30,11 +23,21 @@ func (s *Server) LANSegmentAddr(name string) string {
return "" return ""
} }
// setupSegmentRPC returns an error if any segments are defined since the OSS
// version of Consul doesn't support them.
func (s *Server) setupSegmentRPC() (map[string]net.Listener, error) {
if len(s.config.Segments) > 0 {
return nil, structs.ErrSegmentsNotSupported
}
return nil, nil
}
// setupSegments returns an error if any segments are defined since the OSS // setupSegments returns an error if any segments are defined since the OSS
// version of Consul doens't support them. // version of Consul doesn't support them.
func (s *Server) setupSegments(config *Config, port int) error { func (s *Server) setupSegments(config *Config, port int) error {
if len(config.Segments) > 0 { if len(config.Segments) > 0 {
return ErrSegmentsNotSupported return structs.ErrSegmentsNotSupported
} }
return nil return nil

View File

@ -50,11 +50,10 @@ const (
) )
const ( const (
serfLANSnapshot = "serf/local.snapshot" serfLANSnapshot = "serf/local.snapshot"
serfLANSegmentSnapshot = "serf/local-segment-%s.snapshot" serfWANSnapshot = "serf/remote.snapshot"
serfWANSnapshot = "serf/remote.snapshot" raftState = "raft/"
raftState = "raft/" snapshotsRetained = 2
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection // serverRPCCache controls how long we keep an idle connection
// open to a server // open to a server
@ -164,11 +163,7 @@ type Server struct {
serfLAN *serf.Serf serfLAN *serf.Serf
// segmentLAN maps segment names to their Serf cluster // segmentLAN maps segment names to their Serf cluster
segmentLAN map[string]*serf.Serf segmentLAN map[string]*serf.Serf
segmentLock sync.RWMutex
// segmentListeners holds the RPC listener for any segment with a separate listener.
segmentListeners map[string]net.Listener
// serfWAN is the Serf cluster maintained between DC's // serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers // which SHOULD only consist of Consul servers
@ -309,7 +304,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error), reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
segmentListeners: make(map[string]net.Listener),
sessionTimers: NewSessionTimers(), sessionTimers: NewSessionTimers(),
tombstoneGC: gc, tombstoneGC: gc,
serverLookup: NewServerLookup(), serverLookup: NewServerLookup(),
@ -346,6 +340,13 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
return nil, fmt.Errorf("Failed to start RPC layer: %v", err) return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
} }
// Initialize any extra RPC listeners for segments.
segmentListeners, err := s.setupSegmentRPC()
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start segment RPC layer: %v", err)
}
// Initialize the Raft server. // Initialize the Raft server.
if err := s.setupRaft(); err != nil { if err := s.setupRaft(); err != nil {
s.Shutdown() s.Shutdown()
@ -363,7 +364,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Initialize the WAN Serf. // Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "") s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
@ -386,7 +387,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
} }
// Initialize the LAN Serf for the default network segment. // Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "") s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener)
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err) return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
@ -425,7 +426,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
go s.listen(s.Listener) go s.listen(s.Listener)
// Start listeners for any segments with separate RPC listeners. // Start listeners for any segments with separate RPC listeners.
for _, listener := range s.segmentListeners { for _, listener := range segmentListeners {
go s.listen(listener) go s.listen(listener)
} }
@ -654,19 +655,6 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise) return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise)
} }
for _, segment := range s.config.Segments {
if segment.RPCAddr == nil {
continue
}
segmentListener, err := net.ListenTCP("tcp", segment.RPCAddr)
if err != nil {
return err
}
s.segmentListeners[segment.Name] = segmentListener
}
// Provide a DC specific wrapper. Raft replication is only // Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant. // ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap) wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
@ -910,9 +898,6 @@ func (s *Server) Encrypted() bool {
// LANSegments returns a map of LAN segments by name // LANSegments returns a map of LAN segments by name
func (s *Server) LANSegments() map[string]*serf.Serf { func (s *Server) LANSegments() map[string]*serf.Serf {
s.segmentLock.RLock()
defer s.segmentLock.RUnlock()
segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1) segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1)
segments[""] = s.serfLAN segments[""] = s.serfLAN
for name, segment := range s.segmentLAN { for name, segment := range s.segmentLAN {

View File

@ -29,7 +29,8 @@ const (
) )
// setupSerf is used to setup and initialize a Serf // setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int, segment string) (*serf.Serf, error) { func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int,
segment string, listener net.Listener) (*serf.Serf, error) {
conf.Init() conf.Init()
if wan { if wan {
@ -43,8 +44,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["segment"] = segment conf.Tags["segment"] = segment
if segment == "" { if segment == "" {
for _, s := range s.config.Segments { for _, s := range s.config.Segments {
conf.Tags["segment_addr_"+s.Name] = s.Advertise conf.Tags["sl_"+s.Name] = net.JoinHostPort(s.Advertise, fmt.Sprintf("%d", s.Port))
conf.Tags["segment_port_"+s.Name] = fmt.Sprintf("%d", s.Port)
} }
} }
conf.Tags["id"] = string(s.config.NodeID) conf.Tags["id"] = string(s.config.NodeID)
@ -53,10 +53,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build conf.Tags["build"] = s.config.Build
addr := s.Listener.Addr().(*net.TCPAddr) addr := listener.Addr().(*net.TCPAddr)
if listener, ok := s.segmentListeners[segment]; ok {
addr = listener.Addr().(*net.TCPAddr)
}
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap { if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1" conf.Tags["bootstrap"] = "1"

View File

@ -100,16 +100,19 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
segment_addrs := make(map[string]string) segment_addrs := make(map[string]string)
segment_ports := make(map[string]int) segment_ports := make(map[string]int)
for name, value := range m.Tags { for name, value := range m.Tags {
if strings.HasPrefix(name, "segment_addr_") { if strings.HasPrefix(name, "sl_") {
segment_addrs[strings.TrimPrefix(name, "segment_addr_")] = value addr, port, err := net.SplitHostPort(value)
}
if strings.HasPrefix(name, "segment_port_") {
segment_port, err := strconv.Atoi(value)
if err != nil { if err != nil {
return false, nil return false, nil
} }
segment_ports[strings.TrimPrefix(name, "segment_port_")] = segment_port segment_port, err := strconv.Atoi(port)
if err != nil {
return false, nil
}
segment_name := strings.TrimPrefix(name, "sl_")
segment_addrs[segment_name] = addr
segment_ports[segment_name] = segment_port
} }
} }

19
agent/segment_stub.go Normal file
View File

@ -0,0 +1,19 @@
// +build !ent
package agent
import (
"github.com/hashicorp/consul/agent/structs"
)
func ValidateSegments(conf *Config) error {
if conf.Segment != "" {
return structs.ErrSegmentsNotSupported
}
if len(conf.Segments) > 0 {
return structs.ErrSegmentsNotSupported
}
return nil
}

View File

@ -20,6 +20,7 @@ var (
ErrNoDCPath = fmt.Errorf("No path to datacenter") ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers") ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads") ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
ErrSegmentsNotSupported = fmt.Errorf("Network segments are not supported in this version of Consul")
) )
type MessageType uint8 type MessageType uint8

View File

@ -502,34 +502,50 @@ func TestStructs_ValidateMetadata(t *testing.T) {
if err := ValidateMetadata(meta, false); !strings.Contains(err.Error(), "cannot contain more than") { if err := ValidateMetadata(meta, false); !strings.Contains(err.Error(), "cannot contain more than") {
t.Fatalf("should have failed") t.Fatalf("should have failed")
} }
// Should not error
meta = map[string]string{
metaKeyReservedPrefix + "key": "value1",
}
// Should fail
if err := ValidateMetadata(meta, false); err == nil || !strings.Contains(err.Error(), "reserved for internal use") {
t.Fatalf("err: %s", err)
}
// Should succeed
if err := ValidateMetadata(meta, true); err != nil {
t.Fatalf("err: %s", err)
}
} }
func TestStructs_validateMetaPair(t *testing.T) { func TestStructs_validateMetaPair(t *testing.T) {
longKey := strings.Repeat("a", metaKeyMaxLength+1) longKey := strings.Repeat("a", metaKeyMaxLength+1)
longValue := strings.Repeat("b", metaValueMaxLength+1) longValue := strings.Repeat("b", metaValueMaxLength+1)
pairs := []struct { pairs := []struct {
Key string Key string
Value string Value string
Error string Error string
AllowConsulPrefix bool
}{ }{
// valid pair // valid pair
{"key", "value", ""}, {"key", "value", "", false},
// invalid, blank key // invalid, blank key
{"", "value", "cannot be blank"}, {"", "value", "cannot be blank", false},
// allowed special chars in key name // allowed special chars in key name
{"k_e-y", "value", ""}, {"k_e-y", "value", "", false},
// disallowed special chars in key name // disallowed special chars in key name
{"(%key&)", "value", "invalid characters"}, {"(%key&)", "value", "invalid characters", false},
// key too long // key too long
{longKey, "value", "Key is too long"}, {longKey, "value", "Key is too long", false},
// reserved prefix // reserved prefix
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"}, {metaKeyReservedPrefix + "key", "value", "reserved for internal use", false},
// reserved prefix, allowed
{metaKeyReservedPrefix + "key", "value", "", true},
// value too long // value too long
{"key", longValue, "Value is too long"}, {"key", longValue, "Value is too long", false},
} }
for _, pair := range pairs { for _, pair := range pairs {
err := validateMetaPair(pair.Key, pair.Value, false) err := validateMetaPair(pair.Key, pair.Value, pair.AllowConsulPrefix)
if pair.Error == "" && err != nil { if pair.Error == "" && err != nil {
t.Fatalf("should have succeeded: %v, %v", pair, err) t.Fatalf("should have succeeded: %v, %v", pair, err)
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) { } else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {

View File

@ -46,8 +46,8 @@ type AgentMember struct {
// MembersOpts is used for querying member information. // MembersOpts is used for querying member information.
type MembersOpts struct { type MembersOpts struct {
// Wan is whether to show members from the LAN. // WAN is whether to show members from the WAN.
Wan bool WAN bool
// Segment is the LAN segment to show members. // Segment is the LAN segment to show members.
Segment string Segment string
@ -270,7 +270,7 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) { func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members") r := a.c.newRequest("GET", "/v1/agent/members")
r.params.Set("segment", opts.Segment) r.params.Set("segment", opts.Segment)
if opts.Wan { if opts.WAN {
r.params.Set("wan", "1") r.params.Set("wan", "1")
} }

View File

@ -91,6 +91,29 @@ func TestAPI_AgentReload(t *testing.T) {
} }
} }
func TestAPI_AgentMembersOpts(t *testing.T) {
t.Parallel()
c, s1 := makeClient(t)
_, s2 := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
c.Datacenter = "dc2"
})
defer s1.Stop()
defer s2.Stop()
agent := c.Agent()
s2.JoinWAN(t, s1.WANAddr)
members, err := agent.MembersOpts(MembersOpts{WAN: true})
if err != nil {
t.Fatalf("err: %v", err)
}
if len(members) != 2 {
t.Fatalf("bad: %v", members)
}
}
func TestAPI_AgentMembers(t *testing.T) { func TestAPI_AgentMembers(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)

View File

@ -65,16 +65,8 @@ func (c *MembersCommand) Run(args []string) int {
return 1 return 1
} }
members, err := client.Agent().MembersOpts(consulapi.MembersOpts{
Wan: wan,
Segment: segment,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1
}
// Check if we queried a server and need to query for members in all segments. // Check if we queried a server and need to query for members in all segments.
var members []*consulapi.AgentMember
if !wan && segment == "" { if !wan && segment == "" {
self, err := client.Agent().Self() self, err := client.Agent().Self()
if err != nil { if err != nil {
@ -87,7 +79,17 @@ func (c *MembersCommand) Run(args []string) int {
c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err)) c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err))
return 1 return 1
} }
members = append(members, segmentMembers...) members = segmentMembers
}
} else {
var err error
members, err = client.Agent().MembersOpts(consulapi.MembersOpts{
WAN: wan,
Segment: segment,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1
} }
} }

View File

@ -4,10 +4,16 @@ package command
import ( import (
consulapi "github.com/hashicorp/consul/api" consulapi "github.com/hashicorp/consul/api"
"fmt"
) )
// getSegmentMembers returns an empty list since network segments are not // getSegmentMembers returns an empty list since network segments are not
// supported in OSS Consul. // supported in OSS Consul.
func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) { func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) {
return nil, nil members, err := client.Agent().MembersOpts(consulapi.MembersOpts{})
if err != nil {
return nil, fmt.Errorf("Error retrieving members: %s", err)
}
return members, nil
} }