Merge pull request #3431 from hashicorp/network-segments-oss

This commit is contained in:
Kyle Havlovitz 2017-09-01 10:24:58 -07:00 committed by GitHub
commit 220db48aa7
71 changed files with 1690 additions and 544 deletions

View File

@ -58,7 +58,7 @@ cov:
test: dev-build vet
go test -tags '$(GOTAGS)' -i ./...
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test$(GOTEST_FLAGS).log ; echo $$? > exit-code
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test.log ; echo $$? > exit-code
@echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log
@echo "----"
@grep -A5 'DATA RACE' test.log || true

View File

@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host"
)
@ -56,9 +55,10 @@ const (
// consul.Client and consul.Server.
type delegate interface {
Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error)
GetLANCoordinate() (lib.CoordinateSet, error)
Leave() error
LANMembers() []serf.Member
LANSegmentMembers(segment string) ([]serf.Member, error)
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error
@ -647,6 +647,14 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.AdvertiseAddrs.RPC != nil {
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
}
base.Segment = a.config.Segment
if len(a.config.Segments) > 0 {
segments, err := a.segmentConfig()
if err != nil {
return nil, err
}
base.Segments = segments
}
if a.config.Bootstrap {
base.Bootstrap = true
}
@ -763,6 +771,58 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
return base, nil
}
// Setup the serf and memberlist config for any defined network segments.
func (a *Agent) segmentConfig() ([]consul.NetworkSegment, error) {
var segments []consul.NetworkSegment
config := a.config
for _, segment := range config.Segments {
serfConf := consul.DefaultConfig().SerfLANConfig
if segment.Advertise != "" {
serfConf.MemberlistConfig.AdvertiseAddr = segment.Advertise
} else {
serfConf.MemberlistConfig.AdvertiseAddr = a.config.AdvertiseAddr
}
if segment.Bind != "" {
serfConf.MemberlistConfig.BindAddr = segment.Bind
} else {
serfConf.MemberlistConfig.BindAddr = a.config.BindAddr
}
serfConf.MemberlistConfig.AdvertisePort = segment.Port
serfConf.MemberlistConfig.BindPort = segment.Port
if config.ReconnectTimeoutLan != 0 {
serfConf.ReconnectTimeout = config.ReconnectTimeoutLan
}
if config.EncryptVerifyIncoming != nil {
serfConf.MemberlistConfig.GossipVerifyIncoming = *config.EncryptVerifyIncoming
}
if config.EncryptVerifyOutgoing != nil {
serfConf.MemberlistConfig.GossipVerifyOutgoing = *config.EncryptVerifyOutgoing
}
var rpcAddr *net.TCPAddr
if segment.RPCListener {
rpcAddr = &net.TCPAddr{
IP: net.ParseIP(segment.Bind),
Port: a.config.Ports.Server,
}
}
segments = append(segments, consul.NetworkSegment{
Name: segment.Name,
Bind: serfConf.MemberlistConfig.BindAddr,
Port: segment.Port,
Advertise: serfConf.MemberlistConfig.AdvertiseAddr,
RPCAddr: rpcAddr,
SerfConfig: serfConf,
})
}
return segments, nil
}
// makeRandomID will generate a random UUID for a node.
func (a *Agent) makeRandomID() (string, error) {
id, err := uuid.GenerateUUID()
@ -1154,15 +1214,16 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
// are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
// GetLANCoordinate returns the coordinates of this node in the local pools
// (assumes coordinates are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (lib.CoordinateSet, error) {
return a.delegate.GetLANCoordinate()
}
// sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() {
OUTER:
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
@ -1182,26 +1243,29 @@ func (a *Agent) sendCoordinate() {
continue
}
c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err)
continue
}
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
for segment, coord := range cs {
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
if acl.IsErrPermissionDenied(err) {
a.logger.Printf("[WARN] agent: Coordinate update blocked by ACLs")
} else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
}
continue OUTER
}
continue
}
case <-a.shutdownCh:
return
@ -2105,6 +2169,8 @@ func (a *Agent) loadMetadata(conf *Config) error {
a.state.metadata[key] = value
}
a.state.metadata[structs.MetaSegmentKey] = conf.Segment
a.state.changeMade()
return nil

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils"
@ -27,10 +28,10 @@ type Self struct {
}
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate
var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates {
var err error
if c, err = s.agent.GetLANCoordinate(); err != nil {
if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
@ -48,7 +49,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
return Self{
Config: s.agent.config,
Coord: c,
Coord: cs[s.agent.config.Segment],
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
@ -155,11 +156,28 @@ func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (
wan = true
}
segment := req.URL.Query().Get("segment")
if wan && segment != "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
return nil, nil
}
var members []serf.Member
if wan {
members = s.agent.WANMembers()
} else {
members = s.agent.LANMembers()
// If the segment is blank when querying a client, use the agent's
// segment instead of the empty string.
if !s.agent.config.Server && segment == "" {
segment = s.agent.config.Segment
}
var err error
members, err = s.agent.delegate.LANSegmentMembers(segment)
if err != nil {
return nil, err
}
}
if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err

View File

@ -191,13 +191,14 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj)
}
c, err := a.GetLANCoordinate()
cs, err := a.GetLANCoordinate()
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(c, val.Coord) {
if c := cs[cfg.Segment]; !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
}
delete(val.Meta, structs.MetaSegmentKey) // Added later, not in config.
if !reflect.DeepEqual(cfg.Meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta)
}

File diff suppressed because one or more lines are too long

View File

@ -25,6 +25,14 @@ import (
"github.com/mitchellh/mapstructure"
)
const (
// SegmentLimit is the maximum number of network segments that may be declared.
SegmentLimit = 64
// SegmentNameLimit is the maximum segment name length.
SegmentNameLimit = 64
)
// Ports is used to simplify the configuration by
// providing default ports, and allowing the addresses
// to only be specified once
@ -342,6 +350,26 @@ type Autopilot struct {
UpgradeVersionTag string `mapstructure:"upgrade_version_tag"`
}
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
// isolated serf group on the LAN.
type NetworkSegment struct {
// Name is the name of the segment.
Name string `mapstructure:"name"`
// Bind is the bind address for this segment.
Bind string `mapstructure:"bind"`
// Port is the port for this segment.
Port int `mapstructure:"port"`
// RPCListener is whether to bind a separate RPC listener on the bind address
// for this segment.
RPCListener bool `mapstructure:"rpc_listener"`
// Advertise is the advertise address of this segment.
Advertise string `mapstructure:"advertise"`
}
// Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must
// be set using a configuration file.
@ -465,6 +493,13 @@ type Config struct {
// Address configurations
Addresses AddressConfig
// (Enterprise-only) NetworkSegment is the network segment for this client to join.
Segment string `mapstructure:"segment"`
// (Enterprise-only) Segments is the list of network segments for this server to
// initialize.
Segments []NetworkSegment `mapstructure:"segments"`
// Tagged addresses. These are used to publish a set of addresses for
// for a node, which can be used by the remote agent. We currently
// populate only the "wan" tag based on the SerfWan advertise address,
@ -1378,6 +1413,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.AdvertiseAddrs.RPC = addr
}
// Validate segment config.
if err := ValidateSegments(&result); err != nil {
return nil, err
}
// Enforce the max Raft multiplier.
if result.Performance.RaftMultiplier > consul.MaxRaftMultiplier {
return nil, fmt.Errorf("Performance.RaftMultiplier must be <= %d", consul.MaxRaftMultiplier)
@ -1426,6 +1466,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
}
}
// Validate node meta fields
if err := structs.ValidateMetadata(result.Meta, false); err != nil {
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
}
return &result, nil
}
@ -1861,6 +1906,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Addresses.RPC != "" {
result.Addresses.RPC = b.Addresses.RPC
}
if b.Segment != "" {
result.Segment = b.Segment
}
if len(b.Segments) > 0 {
result.Segments = append(result.Segments, b.Segments...)
}
if b.EnableUI {
result.EnableUI = true
}
@ -2204,6 +2255,10 @@ func (c *Config) ResolveTmplAddrs() (err error) {
parse(&c.ClientAddr, true, "Client address")
parse(&c.SerfLanBindAddr, false, "Serf LAN address")
parse(&c.SerfWanBindAddr, false, "Serf WAN address")
for i, segment := range c.Segments {
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))
}
return
}

View File

@ -1401,6 +1401,15 @@ func TestMergeConfig(t *testing.T) {
HTTP: "127.0.0.2",
HTTPS: "127.0.0.4",
},
Segment: "alpha",
Segments: []NetworkSegment{
{
Name: "alpha",
Bind: "127.0.0.1",
Port: 1234,
Advertise: "127.0.0.2",
},
},
Server: true,
LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true),

View File

@ -890,9 +890,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err)
@ -1495,9 +1495,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -5,18 +5,14 @@ import (
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
@ -146,35 +142,6 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
return c, nil
}
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// Shutdown is used to shutdown the client
func (c *Client) Shutdown() error {
c.logger.Printf("[INFO] consul: shutting down client")
@ -227,6 +194,16 @@ func (c *Client) LANMembers() []serf.Member {
return c.serf.Members()
}
// LANSegmentMembers only returns our own segment's members, because clients
// can't be in multiple segments.
func (c *Client) LANSegmentMembers(segment string) ([]serf.Member, error) {
if segment == c.config.Segment {
return c.LANMembers(), nil
}
return nil, fmt.Errorf("segment %q not found", segment)
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node)
@ -242,98 +219,6 @@ func (c *Client) Encrypted() bool {
return c.serf.EncryptionEnabled()
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
server := c.routers.FindServer()
@ -413,6 +298,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) {
return c.serf.GetCoordinate()
func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
lan, err := c.serf.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil
}

137
agent/consul/client_serf.go Normal file
View File

@ -0,0 +1,137 @@
package consul
import (
"fmt"
"path/filepath"
"strings"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
)
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["segment"] = c.config.Segment
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}

View File

@ -49,6 +49,17 @@ func init() {
}
}
// (Enterprise-only) NetworkSegment is the address and port configuration
// for a network segment.
type NetworkSegment struct {
Name string
Bind string
Port int
Advertise string
RPCAddr *net.TCPAddr
SerfConfig *serf.Config
}
// Config is used to configure the server
type Config struct {
// Bootstrap mode is used to bring up the first Consul server.
@ -105,6 +116,13 @@ type Config struct {
// RPCSrcAddr is the source address for outgoing RPC connections.
RPCSrcAddr *net.TCPAddr
// (Enterprise-only) The network segment this agent is part of.
Segment string
// (Enterprise-only) Segments is a list of network segments for a server to
// bind on.
Segments []NetworkSegment
// SerfLANConfig is the configuration for the intra-dc serf
SerfLANConfig *serf.Config

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
// Coordinate manages queries and updates for network coordinates.
@ -18,8 +17,10 @@ type Coordinate struct {
// srv is a pointer back to the server.
srv *Server
// updates holds pending coordinate updates for the given nodes.
updates map[string]*coordinate.Coordinate
// updates holds pending coordinate updates for the given nodes. This is
// keyed by node:segment so we can get a coordinate for each segment for
// servers, and we only track the latest update per node:segment.
updates map[string]*structs.CoordinateUpdateRequest
// updatesLock synchronizes access to the updates map.
updatesLock sync.Mutex
@ -29,7 +30,7 @@ type Coordinate struct {
func NewCoordinate(srv *Server) *Coordinate {
c := &Coordinate{
srv: srv,
updates: make(map[string]*coordinate.Coordinate),
updates: make(map[string]*structs.CoordinateUpdateRequest),
}
go c.batchUpdate()
@ -58,7 +59,7 @@ func (c *Coordinate) batchApplyUpdates() error {
// incoming messages.
c.updatesLock.Lock()
pending := c.updates
c.updates = make(map[string]*coordinate.Coordinate)
c.updates = make(map[string]*structs.CoordinateUpdateRequest)
c.updatesLock.Unlock()
// Enforce the rate limit.
@ -73,12 +74,16 @@ func (c *Coordinate) batchApplyUpdates() error {
// batches.
i := 0
updates := make(structs.Coordinates, size)
for node, coord := range pending {
for _, update := range pending {
if !(i < size) {
break
}
updates[i] = &structs.Coordinate{Node: node, Coord: coord}
updates[i] = &structs.Coordinate{
Node: update.Node,
Segment: update.Segment,
Coord: update.Coord,
}
i++
}
@ -140,8 +145,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
}
// Add the coordinate to the map of pending updates.
key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
c.updatesLock.Lock()
c.updates[args.Node] = args.Coord
c.updates[key] = args
c.updatesLock.Unlock()
return nil
}
@ -187,6 +193,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}

View File

@ -5,17 +5,18 @@ import (
"math"
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
@ -33,15 +34,6 @@ func generateRandomCoordinate() *coordinate.Coordinate {
return coord
}
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
// equal (no floating point fuzz is considered since we are trying to make sure
// we are getting exactly the coordinates we expect, without math on them).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
}
}
func TestCoordinate_Update(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
@ -94,20 +86,17 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
c, err := state.CoordinateGetRaw("node1")
c, err := state.Coordinate("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
c, err = state.CoordinateGetRaw("node2")
verify.Values(t, "", c, lib.CoordinateSet{})
c, err = state.Coordinate("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
verify.Values(t, "", c, lib.CoordinateSet{})
// Send another update for the second node. It should take precedence
// since there will be two updates in the same batch.
@ -118,22 +107,23 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
c, err = state.CoordinateGetRaw("node1")
c, err = state.Coordinate("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
expected := lib.CoordinateSet{
"": arg1.Coord,
}
verifyCoordinatesEqual(t, c, arg1.Coord)
c, err = state.CoordinateGetRaw("node2")
verify.Values(t, "", c, expected)
c, err = state.Coordinate("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
expected = lib.CoordinateSet{
"": arg2.Coord,
}
verifyCoordinatesEqual(t, c, arg2.Coord)
verify.Values(t, "", c, expected)
// Register a bunch of additional nodes.
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
@ -165,11 +155,11 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i))
c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
if len(c) == 0 {
numDropped++
}
}
@ -304,7 +294,7 @@ func TestCoordinate_ListDatacenters(t *testing.T) {
if err != nil {
t.Fatalf("bad: %v", err)
}
verifyCoordinatesEqual(t, c, out[0].Coordinates[0].Coord)
verify.Values(t, "", c, out[0].Coordinates[0].Coord)
}
func TestCoordinate_ListNodes(t *testing.T) {
@ -374,9 +364,9 @@ func TestCoordinate_ListNodes(t *testing.T) {
resp.Coordinates[2].Node != "foo" {
r.Fatalf("bad: %v", resp.Coordinates)
}
verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar
verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz
verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo
verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar
verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz
verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo
})
}

View File

@ -24,7 +24,7 @@ func (s *Server) FloodNotify() {
// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(portFn router.FloodPortFn, global *serf.Serf) {
func (s *Server) Flood(addrFn router.FloodAddrFn, portFn router.FloodPortFn, global *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
@ -61,6 +61,6 @@ func (s *Server) Flood(portFn router.FloodPortFn, global *serf.Serf) {
}
FLOOD:
router.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
router.FloodJoins(s.logger, addrFn, portFn, s.config.Datacenter, s.serfLAN, global)
}
}

View File

@ -171,8 +171,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -444,8 +444,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)
@ -748,8 +748,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err)
}
updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
{Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
}
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err)

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/serf"
)
@ -85,8 +86,17 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
// Add the consul prefix to the event name
eventName := userEventName(args.Name)
// Fire the event
return m.srv.serfLAN.UserEvent(eventName, args.Payload, false)
// Fire the event on all LAN segments
segments := m.srv.LANSegments()
var errs error
for name, segment := range segments {
err := segment.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err)
errs = multierror.Append(errs, err)
}
}
return errs
}
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.
@ -130,23 +140,35 @@ func (m *Internal) KeyringOperation(
return nil
}
// executeKeyringOp executes the appropriate keyring-related function based on
// the type of keyring operation in the request. It takes the KeyManager as an
// argument, so it can handle any operation for either LAN or WAN pools.
// executeKeyringOp executes the keyring-related operation in the request
// on either the WAN or LAN pools.
func (m *Internal) executeKeyringOp(
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
if wan {
mgr := m.srv.KeyManagerWAN()
m.executeKeyringOpMgr(mgr, args, reply, wan)
} else {
segments := m.srv.LANSegments()
for _, segment := range segments {
mgr := segment.KeyManager()
m.executeKeyringOpMgr(mgr, args, reply, wan)
}
}
}
// executeKeyringOpMgr executes the appropriate keyring-related function based on
// the type of keyring operation in the request. It takes the KeyManager as an
// argument, so it can handle any operation for either LAN or WAN pools.
func (m *Internal) executeKeyringOpMgr(
mgr *serf.KeyManager,
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
var serfResp *serf.KeyResponse
var err error
var mgr *serf.KeyManager
if wan {
mgr = m.srv.KeyManagerWAN()
} else {
mgr = m.srv.KeyManagerLAN()
}
opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor}
switch args.Operation {

View File

@ -63,8 +63,10 @@ func (s *Server) monitorLeadership() {
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Fire a user event indicating a new leader
payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
for name, segment := range s.LANSegments() {
if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err)
}
}
// Reconcile channel is only used once initial reconcile
@ -439,7 +441,9 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true
}
if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
if valid, parts := metadata.IsConsulServer(member); valid &&
parts.Segment == "" &&
parts.Datacenter == s.config.Datacenter {
return true
}
return false

View File

@ -15,6 +15,7 @@ type lanMergeDelegate struct {
dc string
nodeID types.NodeID
nodeName string
segment string
}
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
@ -53,6 +54,10 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter)
}
if segment := m.Tags["segment"]; segment != md.segment {
return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')", m.Name, segment, md.segment)
}
}
return nil
}

View File

@ -101,6 +101,7 @@ func TestMerge_LAN(t *testing.T) {
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
segment: "",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {

View File

@ -89,7 +89,7 @@ func Compile(query *structs.PreparedQuery) (*CompiledTemplate, error) {
// prefix it will be expected to run with. The results might not make
// sense and create a valid service to lookup, but it should render
// without any errors.
if _, err = ct.Render(ct.query.Name); err != nil {
if _, err = ct.Render(ct.query.Name, structs.QuerySource{}); err != nil {
return nil, err
}
@ -99,7 +99,7 @@ func Compile(query *structs.PreparedQuery) (*CompiledTemplate, error) {
// Render takes a compiled template and renders it for the given name. For
// example, if the user looks up foobar.query.consul via DNS then we will call
// this function with "foobar" on the compiled template.
func (ct *CompiledTemplate) Render(name string) (*structs.PreparedQuery, error) {
func (ct *CompiledTemplate) Render(name string, source structs.QuerySource) (*structs.PreparedQuery, error) {
// Make it "safe" to render a default structure.
if ct == nil {
return nil, fmt.Errorf("Cannot render an uncompiled template")
@ -156,6 +156,10 @@ func (ct *CompiledTemplate) Render(name string) (*structs.PreparedQuery, error)
Type: ast.TypeString,
Value: strings.TrimPrefix(name, query.Name),
},
"agent.segment": ast.Variable{
Type: ast.TypeString,
Value: source.Segment,
},
},
FuncMap: map[string]ast.Function{
"match": match,

View File

@ -29,6 +29,7 @@ var (
"${match(0)}",
"${match(1)}",
"${match(2)}",
"${agent.segment}",
},
},
Tags: []string{
@ -38,11 +39,13 @@ var (
"${match(0)}",
"${match(1)}",
"${match(2)}",
"${agent.segment}",
},
NodeMeta: map[string]string{
"foo": "${name.prefix}",
"bar": "${match(0)}",
"baz": "${match(1)}",
"zoo": "${agent.segment}",
},
},
}
@ -83,7 +86,7 @@ func renderBench(b *testing.B, query *structs.PreparedQuery) {
}
for i := 0; i < b.N; i++ {
_, err := compiled.Render("hello-bench-mark")
_, err := compiled.Render("hello-bench-mark", structs.QuerySource{})
if err != nil {
b.Fatalf("err: %v", err)
}
@ -121,7 +124,7 @@ func TestTemplate_Compile(t *testing.T) {
query.Template.Type = structs.QueryTemplateTypeNamePrefixMatch
query.Template.Regexp = "^(hello)there$"
query.Service.Service = "${name.full}"
query.Service.Tags = []string{"${match(1)}"}
query.Service.Tags = []string{"${match(1)}", "${agent.segment}"}
backup, err := copystructure.Copy(query)
if err != nil {
t.Fatalf("err: %v", err)
@ -135,7 +138,7 @@ func TestTemplate_Compile(t *testing.T) {
}
// Do a sanity check render on it.
actual, err := ct.Render("hellothere")
actual, err := ct.Render("hellothere", structs.QuerySource{Segment: "segment-foo"})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -150,6 +153,7 @@ func TestTemplate_Compile(t *testing.T) {
Service: "hellothere",
Tags: []string{
"hello",
"segment-foo",
},
},
}
@ -201,7 +205,7 @@ func TestTemplate_Render(t *testing.T) {
t.Fatalf("err: %v", err)
}
actual, err := ct.Render("unused")
actual, err := ct.Render("unused", structs.QuerySource{})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -218,7 +222,7 @@ func TestTemplate_Render(t *testing.T) {
Regexp: "^(.*?)-(.*?)-(.*)$",
},
Service: structs.ServiceQuery{
Service: "${name.prefix} xxx ${name.full} xxx ${name.suffix}",
Service: "${name.prefix} xxx ${name.full} xxx ${name.suffix} xxx ${agent.segment}",
Tags: []string{
"${match(-1)}",
"${match(0)}",
@ -238,7 +242,7 @@ func TestTemplate_Render(t *testing.T) {
// Run a case that matches the regexp.
{
actual, err := ct.Render("hello-foo-bar-none")
actual, err := ct.Render("hello-foo-bar-none", structs.QuerySource{Segment: "segment-bar"})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -249,7 +253,7 @@ func TestTemplate_Render(t *testing.T) {
Regexp: "^(.*?)-(.*?)-(.*)$",
},
Service: structs.ServiceQuery{
Service: "hello- xxx hello-foo-bar-none xxx foo-bar-none",
Service: "hello- xxx hello-foo-bar-none xxx foo-bar-none xxx segment-bar",
Tags: []string{
"",
"hello-foo-bar-none",
@ -269,7 +273,7 @@ func TestTemplate_Render(t *testing.T) {
// Run a case that doesn't match the regexp
{
actual, err := ct.Render("hello-nope")
actual, err := ct.Render("hello-nope", structs.QuerySource{Segment: "segment-bar"})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -280,7 +284,7 @@ func TestTemplate_Render(t *testing.T) {
Regexp: "^(.*?)-(.*?)-(.*)$",
},
Service: structs.ServiceQuery{
Service: "hello- xxx hello-nope xxx nope",
Service: "hello- xxx hello-nope xxx nope xxx segment-bar",
Tags: []string{
"",
"",
@ -307,7 +311,7 @@ func TestTemplate_Render(t *testing.T) {
RemoveEmptyTags: true,
},
Service: structs.ServiceQuery{
Service: "${name.prefix} xxx ${name.full} xxx ${name.suffix}",
Service: "${name.prefix} xxx ${name.full} xxx ${name.suffix} xxx ${agent.segment}",
Tags: []string{
"${match(-1)}",
"${match(0)}",
@ -326,7 +330,7 @@ func TestTemplate_Render(t *testing.T) {
// Run a case that matches the regexp, removing empty tags.
{
actual, err := ct.Render("hello-foo-bar-none")
actual, err := ct.Render("hello-foo-bar-none", structs.QuerySource{Segment: "segment-baz"})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -338,7 +342,7 @@ func TestTemplate_Render(t *testing.T) {
RemoveEmptyTags: true,
},
Service: structs.ServiceQuery{
Service: "hello- xxx hello-foo-bar-none xxx foo-bar-none",
Service: "hello- xxx hello-foo-bar-none xxx foo-bar-none xxx segment-baz",
Tags: []string{
"hello-foo-bar-none",
"hello",
@ -355,7 +359,7 @@ func TestTemplate_Render(t *testing.T) {
// Run a case that doesn't match the regexp, removing empty tags.
{
actual, err := ct.Render("hello-nope")
actual, err := ct.Render("hello-nope", structs.QuerySource{Segment: "segment-baz"})
if err != nil {
t.Fatalf("err: %v", err)
}
@ -367,7 +371,7 @@ func TestTemplate_Render(t *testing.T) {
RemoveEmptyTags: true,
},
Service: structs.ServiceQuery{
Service: "hello- xxx hello-nope xxx nope",
Service: "hello- xxx hello-nope xxx nope xxx segment-baz",
Tags: []string{
"42",
},

View File

@ -182,7 +182,7 @@ func parseService(svc *structs.ServiceQuery) error {
}
// Make sure the metadata filters are valid
if err := structs.ValidateMetadata(svc.NodeMeta); err != nil {
if err := structs.ValidateMetadata(svc.NodeMeta, true); err != nil {
return err
}
@ -298,7 +298,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
// Try to locate the query.
state := p.srv.fsm.State()
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName)
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
if err != nil {
return err
}
@ -345,7 +345,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Try to locate the query.
state := p.srv.fsm.State()
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName)
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
if err != nil {
return err
}

View File

@ -46,10 +46,10 @@ const (
)
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
func (s *Server) listen(listener net.Listener) {
for {
// Accept a connection
conn, err := s.Listener.Accept()
conn, err := listener.Accept()
if err != nil {
if s.shutdown {
return

View File

@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
)
// nodeSorter takes a list of nodes and a parallel vector of distances and
@ -19,15 +18,16 @@ type nodeSorter struct {
// newNodeSorter returns a new sorter for the given source coordinate and set of
// nodes.
func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) {
func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node)
other, err := state.Coordinate(node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &nodeSorter{nodes, vec}, nil
}
@ -58,15 +58,16 @@ type serviceNodeSorter struct {
// newServiceNodeSorter returns a new sorter for the given source coordinate and
// set of service nodes.
func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) {
func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.ServiceNodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node)
other, err := state.Coordinate(node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &serviceNodeSorter{nodes, vec}, nil
}
@ -97,15 +98,16 @@ type healthCheckSorter struct {
// newHealthCheckSorter returns a new sorter for the given source coordinate and
// set of health checks with nodes.
func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.HealthChecks) (sort.Interface, error) {
func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.HealthChecks) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(checks))
for i, check := range checks {
coord, err := state.CoordinateGetRaw(check.Node)
other, err := state.Coordinate(check.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &healthCheckSorter{checks, vec}, nil
}
@ -136,15 +138,16 @@ type checkServiceNodeSorter struct {
// newCheckServiceNodeSorter returns a new sorter for the given source coordinate
// and set of nodes with health checks.
func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes structs.CheckServiceNodes) (sort.Interface, error) {
func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.CheckServiceNodes) (sort.Interface, error) {
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node.Node)
other, err := state.Coordinate(node.Node.Node)
if err != nil {
return nil, err
}
vec[i] = lib.ComputeDistance(c, coord)
c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
}
return &checkServiceNodeSorter{nodes, vec}, nil
}
@ -166,16 +169,16 @@ func (n *checkServiceNodeSorter) Less(i, j int) bool {
}
// newSorterByDistanceFrom returns a sorter for the given type.
func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) {
func (s *Server) newSorterByDistanceFrom(cs lib.CoordinateSet, subj interface{}) (sort.Interface, error) {
switch v := subj.(type) {
case structs.Nodes:
return s.newNodeSorter(c, v)
return s.newNodeSorter(cs, v)
case structs.ServiceNodes:
return s.newServiceNodeSorter(c, v)
return s.newServiceNodeSorter(cs, v)
case structs.HealthChecks:
return s.newHealthCheckSorter(c, v)
return s.newHealthCheckSorter(cs, v)
case structs.CheckServiceNodes:
return s.newCheckServiceNodeSorter(c, v)
return s.newCheckServiceNodeSorter(cs, v)
default:
panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj))
}
@ -197,19 +200,19 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
return nil
}
// There won't always be a coordinate for the source node. If there's not
// one then we can bail out because there's no meaning for the sort.
// There won't always be coordinates for the source node. If there are
// none then we can bail out because there's no meaning for the sort.
state := s.fsm.State()
coord, err := state.CoordinateGetRaw(source.Node)
cs, err := state.Coordinate(source.Node)
if err != nil {
return err
}
if coord == nil {
if len(cs) == 0 {
return nil
}
// Do the sort!
sorter, err := s.newSorterByDistanceFrom(coord, subj)
sorter, err := s.newSorterByDistanceFrom(cs, subj)
if err != nil {
return err
}

View File

@ -0,0 +1,48 @@
// +build !ent
package consul
import (
"net"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/serf/serf"
)
// LANSegmentMembers is used to return the members of the given LAN segment.
func (s *Server) LANSegmentMembers(segment string) ([]serf.Member, error) {
if segment == "" {
return s.LANMembers(), nil
}
return nil, structs.ErrSegmentsNotSupported
}
// LANSegmentAddr is used to return the address used for the given LAN segment.
func (s *Server) LANSegmentAddr(name string) string {
return ""
}
// setupSegmentRPC returns an error if any segments are defined since the OSS
// version of Consul doesn't support them.
func (s *Server) setupSegmentRPC() (map[string]net.Listener, error) {
if len(s.config.Segments) > 0 {
return nil, structs.ErrSegmentsNotSupported
}
return nil, nil
}
// setupSegments returns an error if any segments are defined since the OSS
// version of Consul doesn't support them.
func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string]net.Listener) error {
if len(config.Segments) > 0 {
return structs.ErrSegmentsNotSupported
}
return nil
}
// floodSegments is a NOP in the OSS version of Consul.
func (s *Server) floodSegments(config *Config) {
}

View File

@ -162,6 +162,9 @@ type Server struct {
// which contains all the DC nodes
serfLAN *serf.Serf
// segmentLAN maps segment names to their Serf cluster
segmentLAN map[string]*serf.Serf
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
@ -300,6 +303,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
@ -336,6 +340,13 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
}
// Initialize any extra RPC listeners for segments.
segmentListeners, err := s.setupSegmentRPC()
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start segment RPC layer: %v", err)
}
// Initialize the Raft server.
if err := s.setupRaft(); err != nil {
s.Shutdown()
@ -353,7 +364,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN)
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
@ -368,14 +379,24 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
// Initialize the LAN Serf.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN)
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
}
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
go s.lanEventHandler()
// Start the flooders after the LAN event handler is wired up.
s.floodSegments(config)
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
@ -390,7 +411,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
}
return 0, false
}
go s.Flood(portFn, s.serfWAN)
go s.Flood(nil, portFn, s.serfWAN)
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
@ -402,7 +423,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
}
// Start listening for RPC requests.
go s.listen()
go s.listen(s.Listener)
// Start listeners for any segments with separate RPC listeners.
for _, listener := range segmentListeners {
go s.listen(listener)
}
// Start the metrics handlers.
go s.sessionStats()
@ -413,67 +439,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
return s, nil
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int) (*serf.Serf, error) {
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we have an unclean exit then attempt to close the Raft store.
@ -936,6 +901,17 @@ func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
}
// LANSegments returns a map of LAN segments by name
func (s *Server) LANSegments() map[string]*serf.Serf {
segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1)
segments[""] = s.serfLAN
for name, segment := range s.segmentLAN {
segments[name] = segment
}
return segments
}
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
@ -1047,8 +1023,21 @@ func (s *Server) Stats() map[string]map[string]string {
}
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) {
return s.serfLAN.GetCoordinate()
func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
lan, err := s.serfLAN.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{"": lan}
for name, segment := range s.segmentLAN {
c, err := segment.GetCoordinate()
if err != nil {
return nil, err
}
cs[name] = c
}
return cs, nil
}
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.

View File

@ -1,10 +1,14 @@
package consul
import (
"fmt"
"net"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
@ -24,6 +28,77 @@ const (
peerRetryBase = 1 * time.Second
)
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int,
segment string, listener net.Listener) (*serf.Serf, error) {
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["segment"] = segment
if segment == "" {
for _, s := range s.config.Segments {
conf.Tags["sl_"+s.Name] = net.JoinHostPort(s.Advertise, fmt.Sprintf("%d", s.Port))
}
}
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
addr := listener.Addr().(*net.TCPAddr)
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: segment,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// userEventName computes the name of a user event
func userEventName(name string) string {
return userEventPrefix + name
@ -126,7 +201,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok {
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta)
@ -262,7 +337,7 @@ func (s *Server) maybeBootstrap() {
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok {
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Printf("[INFO] consul: Removing LAN server %s", serverMeta)

View File

@ -370,12 +370,12 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
}
// Delete any coordinate associated with this node.
coord, err := tx.First("coordinates", "id", nodeName)
// Delete any coordinates associated with this node.
coords, err := tx.Get("coordinates", "node", nodeName)
if err != nil {
return fmt.Errorf("failed coordinate lookup: %s", err)
}
if coord != nil {
for coord := coords.Next(); coord != nil; coord = coords.Next() {
if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}

View File

@ -4,8 +4,8 @@ import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
// Coordinates is used to pull all the coordinates from the snapshot.
@ -40,26 +40,23 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
return nil
}
// CoordinateGetRaw queries for the coordinate of the given node. This is an
// unusual state store method because it just returns the raw coordinate or
// nil, none of the Raft or node information is returned. This hits the 90%
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Pull the full coordinate entry.
coord, err := tx.First("coordinates", "id", node)
iter, err := tx.Get("coordinates", "node", node)
if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
// Pick out just the raw coordinate.
if coord != nil {
return coord.(*structs.Coordinate).Coord, nil
results := make(lib.CoordinateSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
}
return nil, nil
return results, nil
}
// Coordinates queries for all nodes with coordinates.

View File

@ -3,12 +3,13 @@ package state
import (
"math"
"math/rand"
"reflect"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
@ -30,25 +31,22 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
s := testStateStore(t)
// Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad.
// a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet()
idx, coords, err := s.Coordinates(ws)
idx, all, err := s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
coord, err := s.CoordinateGetRaw("nope")
verify.Values(t, "", all, structs.Coordinates{})
coords, err := s.Coordinate("nope")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
verify.Values(t, "", coords, lib.CoordinateSet{})
// Make an update for nodes that don't exist and make sure they get
// ignored.
@ -72,16 +70,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should still be empty, though applying an empty batch does bump
// the table index.
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, structs.Coordinates{})
// Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1")
@ -95,26 +91,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now.
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
// Also verify the raw coordinate interface.
// Also verify the per-node coordinate interface.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
coords, err := s.Coordinate(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"": update.Coord,
}
verify.Values(t, "", coords, expected)
}
// Update the coordinate for one of the nodes.
@ -127,26 +122,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
}
// Verify it got applied.
idx, coords, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
// And check the raw coordinate version of the same thing.
// And check the per-node coordinate version of the same thing.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
coords, err := s.Coordinate(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"": update.Coord,
}
verify.Values(t, "", coords, expected)
}
// Apply an invalid update and make sure it gets ignored.
@ -162,16 +156,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify we are at the previous state, though the empty batch does bump
// the table index.
idx, coords, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, updates)
}
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
@ -181,8 +173,14 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
testRegisterNode(t, s, 1, "node1")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
Node: "node1",
Segment: "alpha",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node1",
Segment: "beta",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
@ -190,13 +188,15 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure it's in there.
coord, err := s.CoordinateGetRaw("node1")
coords, err := s.Coordinate("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, updates[0].Coord) {
t.Fatalf("bad: %#v", coord)
expected := lib.CoordinateSet{
"alpha": updates[0].Coord,
"beta": updates[1].Coord,
}
verify.Values(t, "", coords, expected)
// Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil {
@ -204,25 +204,21 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure the coordinate is gone.
coord, err = s.CoordinateGetRaw("node1")
coords, err = s.Coordinate("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
verify.Values(t, "", coords, lib.CoordinateSet{})
// Make sure the index got updated.
idx, coords, err := s.Coordinates(nil)
idx, all, err := s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
verify.Values(t, "", all, structs.Coordinates{})
}
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
@ -291,9 +287,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
// The snapshot will have the bad update in it, since we don't filter on
// the read side.
if !reflect.DeepEqual(dump, append(updates, badUpdate)) {
t.Fatalf("bad: %#v", dump)
}
verify.Values(t, "", dump, append(updates, badUpdate))
// Restore the values into a new state store.
func() {
@ -312,9 +306,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(res, updates) {
t.Fatalf("bad: %#v", res)
}
verify.Values(t, "", res, updates)
// Check that the index was updated (note that it got passed
// in during the restore).

View File

@ -256,7 +256,7 @@ func (s *Store) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *st
// PreparedQueryResolve returns the given prepared query by looking up an ID or
// Name. If the query was looked up by name and it's a template, then the
// template will be rendered before it is returned.
func (s *Store) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
func (s *Store) PreparedQueryResolve(queryIDOrName string, source structs.QuerySource) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -293,7 +293,7 @@ func (s *Store) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.Pre
prep := func(wrapped interface{}) (uint64, *structs.PreparedQuery, error) {
wrapper := wrapped.(*queryWrapper)
if prepared_query.IsTemplate(wrapper.PreparedQuery) {
render, err := wrapper.ct.Render(queryIDOrName)
render, err := wrapper.ct.Render(queryIDOrName, source)
if err != nil {
return idx, nil, err
}

View File

@ -554,7 +554,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
// Try to lookup a query that's not there using something that looks
// like a real ID.
idx, actual, err := s.PreparedQueryResolve(query.ID)
idx, actual, err := s.PreparedQueryResolve(query.ID, structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -567,7 +567,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
// Try to lookup a query that's not there using something that looks
// like a name
idx, actual, err = s.PreparedQueryResolve(query.Name)
idx, actual, err = s.PreparedQueryResolve(query.Name, structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -600,7 +600,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
ModifyIndex: 3,
},
}
idx, actual, err = s.PreparedQueryResolve(query.ID)
idx, actual, err = s.PreparedQueryResolve(query.ID, structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -612,7 +612,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
}
// Read it back using the name and verify it again.
idx, actual, err = s.PreparedQueryResolve(query.Name)
idx, actual, err = s.PreparedQueryResolve(query.Name, structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -625,7 +625,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
// Make sure an empty lookup is well-behaved if there are actual queries
// in the state store.
idx, actual, err = s.PreparedQueryResolve("")
idx, actual, err = s.PreparedQueryResolve("", structs.QuerySource{})
if err != ErrMissingQueryID {
t.Fatalf("bad: %v ", err)
}
@ -681,7 +681,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
ModifyIndex: 4,
},
}
idx, actual, err = s.PreparedQueryResolve("prod-mongodb")
idx, actual, err = s.PreparedQueryResolve("prod-mongodb", structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -708,7 +708,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
ModifyIndex: 5,
},
}
idx, actual, err = s.PreparedQueryResolve("prod-redis-foobar")
idx, actual, err = s.PreparedQueryResolve("prod-redis-foobar", structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -735,7 +735,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
ModifyIndex: 4,
},
}
idx, actual, err = s.PreparedQueryResolve("prod-")
idx, actual, err = s.PreparedQueryResolve("prod-", structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -748,7 +748,7 @@ func TestStateStore_PreparedQueryResolve(t *testing.T) {
// Make sure you can't run a prepared query template by ID, since that
// makes no sense.
_, _, err = s.PreparedQueryResolve(tmpl1.ID)
_, _, err = s.PreparedQueryResolve(tmpl1.ID, structs.QuerySource{})
if err == nil || !strings.Contains(err.Error(), "prepared query templates can only be resolved up by name") {
t.Fatalf("bad: %v", err)
}
@ -960,7 +960,7 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
// Make sure the second query, which is a template, was compiled
// and can be resolved.
_, query, err := s.PreparedQueryResolve("bob-backwards-is-bob")
_, query, err := s.PreparedQueryResolve("bob-backwards-is-bob", structs.QuerySource{})
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -374,6 +374,26 @@ func coordinatesTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,

View File

@ -81,5 +81,18 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request
if out.Coordinates == nil {
out.Coordinates = make(structs.Coordinates, 0)
}
// Filter by segment if applicable
if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 {
segment := v[0]
filtered := make(structs.Coordinates, 0)
for _, coord := range out.Coordinates {
if coord.Segment == segment {
filtered = append(filtered, coord)
}
}
out.Coordinates = filtered
}
return out.Coordinates, nil
}

View File

@ -68,6 +68,7 @@ func TestCoordinate_Nodes(t *testing.T) {
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
var out struct{}
@ -99,4 +100,43 @@ func TestCoordinate_Nodes(t *testing.T) {
coordinates[1].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a nonexistant node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "bar" {
t.Fatalf("bad: %v", coordinates)
}
}

View File

@ -778,6 +778,7 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: d.agent.config.Datacenter,
Segment: d.agent.config.Segment,
Node: d.agent.config.NodeName,
},
}

View File

@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) {
@ -828,6 +829,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) {
@ -1364,6 +1366,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) {
@ -1387,6 +1390,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) {

View File

@ -10,6 +10,7 @@ import (
"net"
"regexp"
"strconv"
"strings"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf"
@ -27,19 +28,22 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server
type Server struct {
Name string
ID string
Datacenter string
Port int
WanJoinPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
NonVoter bool
Addr net.Addr
Status serf.MemberStatus
Name string
ID string
Datacenter string
Segment string
Port int
SegmentAddrs map[string]string
SegmentPorts map[string]int
WanJoinPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
NonVoter bool
Addr net.Addr
Status serf.MemberStatus
// If true, use TLS when connecting to this server
UseTLS bool
@ -73,8 +77,8 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
}
datacenter := m.Tags["dc"]
segment := m.Tags["segment"]
_, bootstrap := m.Tags["bootstrap"]
_, useTLS := m.Tags["use_tls"]
expect := 0
@ -93,6 +97,25 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}
segment_addrs := make(map[string]string)
segment_ports := make(map[string]int)
for name, value := range m.Tags {
if strings.HasPrefix(name, "sl_") {
addr, port, err := net.SplitHostPort(value)
if err != nil {
return false, nil
}
segment_port, err := strconv.Atoi(port)
if err != nil {
return false, nil
}
segment_name := strings.TrimPrefix(name, "sl_")
segment_addrs[segment_name] = addr
segment_ports[segment_name] = segment_port
}
}
build_version, err := version.NewVersion(versionFormat.FindString(m.Tags["build"]))
if err != nil {
return false, nil
@ -127,20 +150,23 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{
Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *build_version,
Version: vsn,
RaftVersion: raft_vsn,
Status: m.Status,
NonVoter: nonVoter,
UseTLS: useTLS,
Name: m.Name,
ID: m.Tags["id"],
Datacenter: datacenter,
Segment: segment,
Port: port,
SegmentAddrs: segment_addrs,
SegmentPorts: segment_ports,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *build_version,
Version: vsn,
RaftVersion: raft_vsn,
Status: m.Status,
NonVoter: nonVoter,
UseTLS: useTLS,
}
return true, parts
}

View File

@ -96,6 +96,7 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
Segment: s.agent.config.Segment,
},
}
s.parseSource(req, &args.Source)
@ -140,6 +141,7 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r
Agent: structs.QuerySource{
Node: s.agent.config.NodeName,
Datacenter: s.agent.config.Datacenter,
Segment: s.agent.config.Segment,
},
}
s.parseSource(req, &args.Source)

View File

@ -10,6 +10,10 @@ import (
"github.com/hashicorp/serf/serf"
)
// FloodAddrFn gets the address to use for a given server when flood-joining. This
// will return false if it doesn't have one.
type FloodAddrFn func(*metadata.Server) (string, bool)
// FloodPortFn gets the port to use for a given server when flood-joining. This
// will return false if it doesn't have one.
type FloodPortFn func(*metadata.Server) (int, bool)
@ -19,7 +23,7 @@ type FloodPortFn func(*metadata.Server) (int, bool)
// local area are of the form <node> and those in the global area are of the
// form <node>.<dc> as is done for WAN and general network areas in Consul
// Enterprise.
func FloodJoins(logger *log.Logger, portFn FloodPortFn,
func FloodJoins(logger *log.Logger, addrFn FloodAddrFn, portFn FloodPortFn,
localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf) {
// Names in the global Serf have the datacenter suffixed.
@ -64,6 +68,11 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn,
logger.Printf("[DEBUG] consul: Failed to flood-join %q (bad address %q): %v",
server.Name, server.Addr.String(), err)
}
if addrFn != nil {
if a, ok := addrFn(server); ok {
addr = a
}
}
// Let the callback see if it can get the port number, otherwise
// leave it blank to behave as if we just supplied an address.

19
agent/segment_stub.go Normal file
View File

@ -0,0 +1,19 @@
// +build !ent
package agent
import (
"github.com/hashicorp/consul/agent/structs"
)
func ValidateSegments(conf *Config) error {
if conf.Segment != "" {
return structs.ErrSegmentsNotSupported
}
if len(conf.Segments) > 0 {
return structs.ErrSegmentsNotSupported
}
return nil
}

View File

@ -20,6 +20,7 @@ var (
ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
ErrSegmentsNotSupported = fmt.Errorf("Network segments are not supported in this version of Consul")
)
type MessageType uint8
@ -74,6 +75,9 @@ const (
// metaValueMaxLength is the maximum allowed length of a metadata value
metaValueMaxLength = 512
// MetaSegmentKey is the node metadata key used to store the node's network segment
MetaSegmentKey = "consul-network-segment"
// MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second
@ -239,6 +243,7 @@ func (r *DeregisterRequest) RequestDatacenter() string {
// coordinates.
type QuerySource struct {
Datacenter string
Segment string
Node string
}
@ -307,13 +312,13 @@ type Node struct {
type Nodes []*Node
// ValidateMeta validates a set of key/value pairs from the agent config
func ValidateMetadata(meta map[string]string) error {
func ValidateMetadata(meta map[string]string, allowConsulPrefix bool) error {
if len(meta) > metaMaxKeyPairs {
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
}
for key, value := range meta {
if err := validateMetaPair(key, value); err != nil {
if err := validateMetaPair(key, value, allowConsulPrefix); err != nil {
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
}
}
@ -322,7 +327,7 @@ func ValidateMetadata(meta map[string]string) error {
}
// validateMetaPair checks that the given key/value pair is in a valid format
func validateMetaPair(key, value string) error {
func validateMetaPair(key, value string, allowConsulPrefix bool) error {
if key == "" {
return fmt.Errorf("Key cannot be blank")
}
@ -332,7 +337,7 @@ func validateMetaPair(key, value string) error {
if len(key) > metaKeyMaxLength {
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
}
if strings.HasPrefix(key, metaKeyReservedPrefix) {
if strings.HasPrefix(key, metaKeyReservedPrefix) && !allowConsulPrefix {
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
}
if len(value) > metaValueMaxLength {
@ -747,8 +752,9 @@ type IndexedSessions struct {
// Coordinate stores a node name with its associated network coordinate.
type Coordinate struct {
Node string
Coord *coordinate.Coordinate
Node string
Segment string
Coord *coordinate.Coordinate
}
type Coordinates []*Coordinate
@ -781,6 +787,7 @@ type DatacenterMap struct {
type CoordinateUpdateRequest struct {
Datacenter string
Node string
Segment string
Coord *coordinate.Coordinate
WriteRequest
}

View File

@ -482,7 +482,7 @@ func TestStructs_ValidateMetadata(t *testing.T) {
"key2": "value2",
}
// Should succeed
if err := ValidateMetadata(meta); err != nil {
if err := ValidateMetadata(meta, false); err != nil {
t.Fatalf("err: %s", err)
}
@ -490,7 +490,7 @@ func TestStructs_ValidateMetadata(t *testing.T) {
meta = map[string]string{
"": "value1",
}
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
if err := ValidateMetadata(meta, false); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
t.Fatalf("should have failed")
}
@ -499,37 +499,53 @@ func TestStructs_ValidateMetadata(t *testing.T) {
for i := 0; i < metaMaxKeyPairs+1; i++ {
meta[string(i)] = "value"
}
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
if err := ValidateMetadata(meta, false); !strings.Contains(err.Error(), "cannot contain more than") {
t.Fatalf("should have failed")
}
// Should not error
meta = map[string]string{
metaKeyReservedPrefix + "key": "value1",
}
// Should fail
if err := ValidateMetadata(meta, false); err == nil || !strings.Contains(err.Error(), "reserved for internal use") {
t.Fatalf("err: %s", err)
}
// Should succeed
if err := ValidateMetadata(meta, true); err != nil {
t.Fatalf("err: %s", err)
}
}
func TestStructs_validateMetaPair(t *testing.T) {
longKey := strings.Repeat("a", metaKeyMaxLength+1)
longValue := strings.Repeat("b", metaValueMaxLength+1)
pairs := []struct {
Key string
Value string
Error string
Key string
Value string
Error string
AllowConsulPrefix bool
}{
// valid pair
{"key", "value", ""},
{"key", "value", "", false},
// invalid, blank key
{"", "value", "cannot be blank"},
{"", "value", "cannot be blank", false},
// allowed special chars in key name
{"k_e-y", "value", ""},
{"k_e-y", "value", "", false},
// disallowed special chars in key name
{"(%key&)", "value", "invalid characters"},
{"(%key&)", "value", "invalid characters", false},
// key too long
{longKey, "value", "Key is too long"},
{longKey, "value", "Key is too long", false},
// reserved prefix
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
{metaKeyReservedPrefix + "key", "value", "reserved for internal use", false},
// reserved prefix, allowed
{metaKeyReservedPrefix + "key", "value", "", true},
// value too long
{"key", longValue, "Value is too long"},
{"key", longValue, "Value is too long", false},
}
for _, pair := range pairs {
err := validateMetaPair(pair.Key, pair.Value)
err := validateMetaPair(pair.Key, pair.Value, pair.AllowConsulPrefix)
if pair.Error == "" && err != nil {
t.Fatalf("should have succeeded: %v, %v", pair, err)
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {

View File

@ -235,6 +235,13 @@ func (a *TestAgent) HTTPAddr() string {
return a.srv.Addr
}
func (a *TestAgent) SegmentAddr(name string) string {
if server, ok := a.Agent.delegate.(*consul.Server); ok {
return server.LANSegmentAddr(name)
}
return ""
}
func (a *TestAgent) Client() *api.Client {
conf := api.DefaultConfig()
conf.Address = a.HTTPAddr()

View File

@ -44,6 +44,15 @@ type AgentMember struct {
DelegateCur uint8
}
// MembersOpts is used for querying member information.
type MembersOpts struct {
// WAN is whether to show members from the WAN.
WAN bool
// Segment is the LAN segment to show members.
Segment string
}
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
ID string `json:",omitempty"`
@ -256,6 +265,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
return out, nil
}
// MembersOpts returns the known gossip members and can be passed
// additional options for WAN/segment filtering.
func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members")
r.params.Set("segment", opts.Segment)
if opts.WAN {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*AgentMember
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// ServiceRegister is used to register a new service with
// the local agent
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {

View File

@ -91,6 +91,29 @@ func TestAPI_AgentReload(t *testing.T) {
}
}
func TestAPI_AgentMembersOpts(t *testing.T) {
t.Parallel()
c, s1 := makeClient(t)
_, s2 := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
c.Datacenter = "dc2"
})
defer s1.Stop()
defer s2.Stop()
agent := c.Agent()
s2.JoinWAN(t, s1.WANAddr)
members, err := agent.MembersOpts(MembersOpts{WAN: true})
if err != nil {
t.Fatalf("err: %v", err)
}
if len(members) != 2 {
t.Fatalf("bad: %v", members)
}
}
func TestAPI_AgentMembers(t *testing.T) {
t.Parallel()
c, s := makeClient(t)

View File

@ -465,7 +465,7 @@ func NewClient(config *Config) (*Client, error) {
if config.Token == "" {
config.Token = defConfig.Token
}
client := &Client{
config: *config,
}

View File

@ -48,7 +48,9 @@ func TestAPI_CatalogNodes(t *testing.T) {
"lan": "127.0.0.1",
"wan": "127.0.0.1",
},
Meta: map[string]string{},
Meta: map[string]string{
"consul-network-segment": "",
},
CreateIndex: meta.LastIndex - 1,
ModifyIndex: meta.LastIndex,
},

View File

@ -6,8 +6,9 @@ import (
// CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct {
Node string
Coord *coordinate.Coordinate
Node string
Segment string
Coord *coordinate.Coordinate
}
// CoordinateDatacenterMap has the coordinates for servers in a given datacenter

11
api/operator_segment.go Normal file
View File

@ -0,0 +1,11 @@
package api
// SegmentList returns all the available LAN segments.
func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) {
var out []string
qm, err := op.c.query("/v1/operator/segment/list", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -117,6 +117,7 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.")
f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "",
"Sets address to advertise on WAN instead of -advertise address.")
f.StringVar(&cmdCfg.Segment, "segment", "", "(Enterprise-only) Sets the network segment to join.")
f.IntVar(&cmdCfg.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.")
@ -224,6 +225,10 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
key, value := agent.ParseMetaPair(entry)
cmdCfg.Meta[key] = value
}
if err := structs.ValidateMetadata(cmdCfg.Meta, false); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
return nil
}
}
cfg := agent.DefaultConfig()
@ -389,6 +394,16 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
return nil
}
if cfg.Server && cfg.Segment != "" {
cmd.UI.Error("Segment option can only be set on clients")
return nil
}
if !cfg.Server && len(cfg.Segments) > 0 {
cmd.UI.Error("Segments can only be configured on servers")
return nil
}
// patch deprecated retry-join-{gce,azure,ec2)-* parameters
// into -retry-join and issue warning.
// todo(fs): this should really be in DecodeConfig where it can be tested
@ -508,11 +523,6 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary")
}
// Verify the node metadata entries are valid
if err := structs.ValidateMetadata(cfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
}
// It doesn't make sense to include both UI options.
if cfg.EnableUI == true && cfg.UIDir != "" {
cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one")
@ -804,17 +814,22 @@ func (cmd *AgentCommand) run(args []string) int {
// Let the agent know we've finished registration
agent.StartSync()
segment := config.Segment
if config.Server {
segment = "<all>"
}
cmd.UI.Output("Consul agent running!")
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
cmd.UI.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS))
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
config.Ports.SerfLan, config.Ports.SerfWan))
cmd.UI.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming))
// Enable log streaming

View File

@ -196,8 +196,11 @@ func TestReadCliConfig(t *testing.T) {
if config.SerfLanBindAddr != "4.3.2.2" {
t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr)
}
if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" {
t.Fatalf("expected somekey=somevalue, got %v", config.Meta)
expected := map[string]string{
"somekey": "somevalue",
}
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
}
}
@ -213,11 +216,11 @@ func TestReadCliConfig(t *testing.T) {
ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()),
}
config := cmd.readConfig()
expected := map[string]string{
"somekey": "somevalue",
"otherkey": "othervalue",
}
config := cmd.readConfig()
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
}

View File

@ -33,6 +33,7 @@ func (c *MembersCommand) Run(args []string) int {
var detailed bool
var wan bool
var statusFilter string
var segment string
f := c.BaseCommand.NewFlagSet(c)
f.BoolVar(&detailed, "detailed", false,
@ -43,6 +44,9 @@ func (c *MembersCommand) Run(args []string) int {
f.StringVar(&statusFilter, "status", ".*",
"If provided, output is filtered to only nodes matching the regular "+
"expression for status.")
f.StringVar(&segment, "segment", "",
"(Enterprise-only) If provided, output is filtered to only nodes in"+
"the given segment.")
if err := c.BaseCommand.Parse(args); err != nil {
return 1
@ -61,16 +65,44 @@ func (c *MembersCommand) Run(args []string) int {
return 1
}
members, err := client.Agent().Members(wan)
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1
// Check if we queried a server and need to query for members in all segments.
var members []*consulapi.AgentMember
if !wan && segment == "" {
self, err := client.Agent().Self()
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving agent info: %s", err))
return 1
}
if self["Config"]["Server"].(bool) {
segmentMembers, err := getSegmentMembers(client)
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err))
return 1
}
members = segmentMembers
}
} else {
var err error
members, err = client.Agent().MembersOpts(consulapi.MembersOpts{
WAN: wan,
Segment: segment,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1
}
}
// Filter the results
n := len(members)
for i := 0; i < n; i++ {
member := members[i]
if member.Tags["segment"] == "" {
member.Tags["segment"] = "<default>"
if member.Tags["role"] == "consul" {
member.Tags["segment"] = "<all>"
}
}
statusString := serf.MemberStatus(member.Status).String()
if !statusRe.MatchString(statusString) {
members[i], members[n-1] = members[n-1], members[i]
@ -86,7 +118,7 @@ func (c *MembersCommand) Run(args []string) int {
return 2
}
sort.Sort(ByMemberName(members))
sort.Sort(ByMemberNameAndSegment(members))
// Generate the output
var result []string
@ -104,17 +136,26 @@ func (c *MembersCommand) Run(args []string) int {
}
// so we can sort members by name
type ByMemberName []*consulapi.AgentMember
type ByMemberNameAndSegment []*consulapi.AgentMember
func (m ByMemberName) Len() int { return len(m) }
func (m ByMemberName) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMemberName) Less(i, j int) bool { return m[i].Name < m[j].Name }
func (m ByMemberNameAndSegment) Len() int { return len(m) }
func (m ByMemberNameAndSegment) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMemberNameAndSegment) Less(i, j int) bool {
switch {
case m[i].Tags["segment"] < m[j].Tags["segment"]:
return true
case m[i].Tags["segment"] > m[j].Tags["segment"]:
return false
default:
return m[i].Name < m[j].Name
}
}
// standardOutput is used to dump the most useful information about nodes
// in a more human-friendly format
func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []string {
result := make([]string, 0, len(members))
header := "Node|Address|Status|Type|Build|Protocol|DC"
header := "Node|Address|Status|Type|Build|Protocol|DC|Segment"
result = append(result, header)
for _, member := range members {
addr := net.TCPAddr{IP: net.ParseIP(member.Addr), Port: int(member.Port)}
@ -126,19 +167,20 @@ func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []stri
build = build[:idx]
}
dc := member.Tags["dc"]
segment := member.Tags["segment"]
statusString := serf.MemberStatus(member.Status).String()
switch member.Tags["role"] {
case "node":
line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc)
line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line)
case "consul":
line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc)
line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line)
default:
line := fmt.Sprintf("%s|%s|%s|unknown|||",
line := fmt.Sprintf("%s|%s|%s|unknown||||",
member.Name, addr.String(), statusString)
result = append(result, line)
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
)
@ -145,18 +146,21 @@ func (c *RTTCommand) Run(args []string) int {
return 1
}
// See if the requested nodes are in there.
// Index all the coordinates by segment.
cs1, cs2 := make(lib.CoordinateSet), make(lib.CoordinateSet)
for _, entry := range entries {
if entry.Node == nodes[0] {
coord1 = entry.Coord
cs1[entry.Segment] = entry.Coord
}
if entry.Node == nodes[1] {
coord2 = entry.Coord
cs2[entry.Segment] = entry.Coord
}
}
if coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
// See if there's a compatible set of coordinates.
coord1, coord2 = cs1.Intersect(cs2)
if coord1 != nil && coord2 != nil {
goto SHOW_RTT
}
}

16
command/segment_stub.go Normal file
View File

@ -0,0 +1,16 @@
// +build !ent
package command
import consulapi "github.com/hashicorp/consul/api"
// getSegmentMembers returns an empty list since network segments are not
// supported in OSS Consul.
func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) {
members, err := client.Agent().MembersOpts(consulapi.MembersOpts{})
if err != nil {
return nil, err
}
return members, nil
}

View File

@ -18,6 +18,39 @@ func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64
return a.DistanceTo(b).Seconds()
}
// CoordinateSet holds all the coordinates for a given node, indexed by network
// segment name.
type CoordinateSet map[string]*coordinate.Coordinate
// Intersect tries to return a pair of coordinates which are compatible with the
// current set and a given set. We employ some special knowledge about network
// segments to avoid doing a full intersection, since this is in several hot
// paths. This might return nil for either coordinate in the output pair if an
// intersection cannot be found. The ComputeDistance function above is designed
// to deal with that.
func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, *coordinate.Coordinate) {
// Use the empty segment by default.
segment := ""
// If we have a single segment, then let our segment take priority since
// we are possibly a client. Any node with more than one segment can only
// be a server, which means it should be in all segments.
if len(cs) == 1 {
for s, _ := range cs {
segment = s
}
}
// Likewise for the other set.
if len(other) == 1 {
for s, _ := range other {
segment = s
}
}
return cs[segment], other[segment]
}
// GenerateCoordinate creates a new coordinate with the given distance from the
// origin. This should only be used for tests.
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {

View File

@ -6,49 +6,148 @@ import (
"time"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
func TestRTT(t *testing.T) {
cases := []struct {
func TestRTT_ComputeDistance(t *testing.T) {
tests := []struct {
desc string
a *coordinate.Coordinate
b *coordinate.Coordinate
dist float64
}{
{
"10 ms",
GenerateCoordinate(0),
GenerateCoordinate(10 * time.Millisecond),
0.010,
},
{
"0 ms",
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.0,
},
{
"2 ms",
GenerateCoordinate(8 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond),
0.002,
},
{
"2 ms reversed",
GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(8 * time.Millisecond),
0.002,
},
{
"a nil",
nil,
GenerateCoordinate(8 * time.Millisecond),
math.Inf(1.0),
},
{
"b nil",
GenerateCoordinate(8 * time.Millisecond),
nil,
math.Inf(1.0),
},
{
"both nil",
nil,
nil,
math.Inf(1.0),
},
}
for i, c := range cases {
dist := ComputeDistance(c.a, c.b)
if c.dist != dist {
t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist)
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
dist := ComputeDistance(tt.a, tt.b)
verify.Values(t, "", dist, tt.dist)
})
}
}
func TestRTT_Intersect(t *testing.T) {
// The numbers here don't matter, we just want a unique coordinate for
// each one.
server_1 := CoordinateSet{
"": GenerateCoordinate(1 * time.Millisecond),
"alpha": GenerateCoordinate(2 * time.Millisecond),
"beta": GenerateCoordinate(3 * time.Millisecond),
}
server_2 := CoordinateSet{
"": GenerateCoordinate(4 * time.Millisecond),
"alpha": GenerateCoordinate(5 * time.Millisecond),
"beta": GenerateCoordinate(6 * time.Millisecond),
}
client_alpha := CoordinateSet{
"alpha": GenerateCoordinate(7 * time.Millisecond),
}
client_beta_1 := CoordinateSet{
"beta": GenerateCoordinate(8 * time.Millisecond),
}
client_beta_2 := CoordinateSet{
"beta": GenerateCoordinate(9 * time.Millisecond),
}
tests := []struct {
desc string
a CoordinateSet
b CoordinateSet
c1 *coordinate.Coordinate
c2 *coordinate.Coordinate
}{
{
"nil maps",
nil, nil,
nil, nil,
},
{
"two servers",
server_1, server_2,
server_1[""], server_2[""],
},
{
"two clients",
client_beta_1, client_beta_2,
client_beta_1["beta"], client_beta_2["beta"],
},
{
"server_1 and client alpha",
server_1, client_alpha,
server_1["alpha"], client_alpha["alpha"],
},
{
"server_1 and client beta 1",
server_1, client_beta_1,
server_1["beta"], client_beta_1["beta"],
},
{
"server_1 and client alpha reversed",
client_alpha, server_1,
client_alpha["alpha"], server_1["alpha"],
},
{
"server_1 and client beta 1 reversed",
client_beta_1, server_1,
client_beta_1["beta"], server_1["beta"],
},
{
"nothing in common",
client_alpha, client_beta_1,
nil, client_beta_1["beta"],
},
{
"nothing in common reversed",
client_beta_1, client_alpha,
nil, client_alpha["alpha"],
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
r1, r2 := tt.a.Intersect(tt.b)
verify.Values(t, "", r1, tt.c1)
verify.Values(t, "", r2, tt.c2)
})
}
}

View File

@ -58,6 +58,14 @@ type TestAddressConfig struct {
HTTP string `json:"http,omitempty"`
}
// TestNetworkSegment contains the configuration for a network segment.
type TestNetworkSegment struct {
Name string `json:"name"`
Bind string `json:"bind"`
Port int `json:"port"`
Advertise string `json:"advertise"`
}
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
NodeName string `json:"node_name"`
@ -68,6 +76,7 @@ type TestServerConfig struct {
Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"`
Segments []TestNetworkSegment `json:"segments"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"`

View File

@ -101,7 +101,12 @@ function notify(message, ttl) {
// TODO: not sure how to how do to this more Ember.js-y
function tomographyMouseOver(el) {
var buf = el.getAttribute('data-node') + ' - ' + el.getAttribute('data-distance') + 'ms';
var segment = el.getAttribute('data-segment');
if (segment !== "") {
buf += ' (Segment: ' + segment + ')';
}
document.getElementById('tomography-node-info').innerHTML = buf;
}
Ember.Handlebars.helper('tomographyGraph', function(tomography, size) {
@ -146,7 +151,7 @@ Ember.Handlebars.helper('tomographyGraph', function(tomography, size) {
}
distances.forEach(function (d, i) {
buf += ' <line transform="rotate(' + (i * 360 / n) + ')" y2="' + (-insetSize * (d.distance / max)) + '" ' +
'data-node="' + d.node + '" data-distance="' + d.distance + '" onmouseover="tomographyMouseOver(this);"/>';
'data-node="' + d.node + '" data-distance="' + d.distance + '" data-segment="' + d.segment + '" onmouseover="tomographyMouseOver(this);"/>';
});
buf += '' +
' </g>' +

View File

@ -292,10 +292,11 @@ App.NodesShowRoute = App.BaseRoute.extend({
var distances = [];
dc.coordinates.forEach(function (node) {
if (params.name == node.Node) {
var segment = node.Segment;
dc.coordinates.forEach(function (other) {
if (node.Node != other.Node) {
if (node.Node != other.Node && other.Segment == segment) {
var dist = distance(node, other);
distances.push({ node: other.Node, distance: dist });
distances.push({ node: other.Node, distance: dist, segment: segment });
sum += dist;
if (dist < min) {
min = dist;

View File

@ -44,6 +44,10 @@ The table below shows this endpoint's support for
members (which is the default). This is only eligible for agents running in
**server mode**. This is specified as part of the URL as a query parameter.
- `segment` `(string: "")` - (Enterprise-only) Specifies the segment to list members in. If left blank,
this will query for the default segment when connecting to a server and the agent's
own segment when connecting to a client (clients can only be part of one network segment).
### Sample Request
```text

View File

@ -108,6 +108,7 @@ $ curl \
[
{
"Node": "agent-one",
"Segment": "",
"Coord": {
"Adjustment": 0,
"Error": 1.5,
@ -117,3 +118,7 @@ $ curl \
}
]
```
In **Consul Enterprise**, this may include multiple coordinates for the same node,
each marked with a different `Segment`. Coordinates are only compatible within the same
segment.

View File

@ -0,0 +1,59 @@
---
layout: api
page_title: Network Segments - Operator - HTTP API
sidebar_current: api-operator-segment
description: |-
The /operator/segments endpoints expose the network segment information via
Consul's HTTP API.
---
# Network Areas - Operator HTTP API
The `/operator/segment` endpoints provide tools to manage network segments via
Consul's HTTP API.
~> **Enterprise-only!** This API endpoint and functionality only exists in
Consul Enterprise. This is not present in the open source version of Consul.
The network area functionality described here is available only in
[Consul Enterprise](https://www.hashicorp.com/products/consul/) version 0.9.3 and
later. Network segments are operator-defined sections of agents on the LAN, typically
isolated from other segments by network configuration.
Please see the [Network Segments Guide](/docs/guides/segments.html) for more details.
## List Network Segments
This endpoint lists all network areas.
| Method | Path | Produces |
| ------ | ---------------------------- | -------------------------- |
| `GET` | `/operator/segment/list` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api/index.html#blocking-queries),
[consistency modes](/api/index.html#consistency-modes), and
[required ACLs](/api/index.html#acls).
| Blocking Queries | Consistency Modes | ACL Required |
| ---------------- | ----------------- | --------------- |
| `NO` | `none` | `operator:read` |
### Parameters
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as a URL query
parameter.
### Sample Request
```text
$ curl \
https://consul.rocks/v1/operator/segment/list
```
### Sample Response
```json
["","alpha","beta"]
```

View File

@ -87,6 +87,25 @@ populate the query before it is executed. All of the string fields inside the
doesn't match, or an invalid index is given, then `${match(N)}` will return an
empty string.
- `${agent.segment}` has the network segment (Enterprise-only) of the agent that
initiated the query. This can be used with the `NodeMeta` field to limit the results
of a query to service instances within its own network segment:
```json
{
"Name": "",
"Template": {
"Type": "name_prefix_match"
},
"Service": {
"Service": "${name.full}",
"NodeMeta": {"consul-network-segment": "${agent.segment}"}
}
}
```
This will map all names of the form "&lt;service&gt;.query.consul" over DNS to a query
that will select an instance of the service in the agent's own network segment.
Using templates, it is possible to apply prepared query behaviors to many
services with a single template. Here's an example template that matches any
query and applies a failover policy to it:

View File

@ -462,6 +462,11 @@ will exit with an error at startup.
as a permanent intent and does not attempt to join the cluster again when starting. This flag
allows the previous state to be used to rejoin the cluster.
* <a name="_segment"></a><a href="#_segment">`-segment`</a> - (Enterprise-only) This flag is used to set
the name of the network segment the agent belongs to. An agent can only join and communicate with other agents
within its network segment. See the [Network Segments Guide](/docs/guides/segments.html) for more details.
By default, this is an empty string, which is the default network segment.
* <a name="_server"></a><a href="#_server">`-server`</a> - This flag is used to control if an
agent is in server or client mode. When provided,
an agent will act as a Consul server. Each Consul cluster must have at least one server and ideally
@ -1052,6 +1057,22 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="retry_interval_wan"></a><a href="#retry_interval_wan">`retry_interval_wan`</a> Equivalent to the
[`-retry-interval-wan` command-line flag](#_retry_interval_wan).
* <a name="segment"></a><a href="#segment">`segment`</a> (Enterprise-only) Equivalent to the
[`-segment` command-line flag](#_segment).
* <a name="segments"></a><a href="#segments">`segments`</a> (Enterprise-only) This is a list of nested objects that allows setting
the bind/advertise information for network segments. This can only be set on servers.
* <a name="segment_name"></a><a href="#segment_name">`name`</a> - The name of the segment. Must be a string between
1 and 64 characters in length.
* <a name="segment_bind"></a><a href="#segment_bind">`bind`</a> - The bind address to use for the segment's gossip layer.
Defaults to the [`-bind`](#_bind) value if not provided.
* <a name="segment_port"></a><a href="#segment_port">`port`</a> - The port to use for the segment's gossip layer.
* <a name="segment_advertise"></a><a href="#segment_advertise">`advertise`</a> - The advertise address to use for the
segment's gossip layer. Defaults to the [`-advertise`](#_advertise) value if not provided.
* <a name="segment_rpc_listener"></a><a href="#segment_rpc_listener">`rpc_listener`</a> - If true, a separate RPC listener will
be started on this segment's [`-bind`](#_bind) address on the rpc port. Only valid if the segment's bind address differs from the
[`-bind`](#_bind) address. Defaults to false.
* <a name="server"></a><a href="#server">`server`</a> Equivalent to the
[`-server` command-line flag](#_server).

View File

@ -31,6 +31,9 @@ Usage: `consul members [options]`
* `-detailed` - If provided, output shows more detailed information
about each node.
* `-segment` - (Enterprise-only) The segment to show members in. If not provided, members
in all segments visible to the agent will be listed.
* `-status` - If provided, output is filtered to only nodes matching
the regular expression for status

View File

@ -18,6 +18,7 @@ increases both scalability and resilience. Features include:
- [Redundancy Zones](/docs/enterprise/redundancy/index.html)
- [Advanced Federation for Complex Network
Topologies](/docs/enterprise/federation/index.html)
- [Network Segments](/docs/guides/segments.html)
These features are part of [Consul
Enterprise](https://www.hashicorp.com/consul.html).

View File

@ -36,6 +36,8 @@ The following guides are available:
* [Leader Election](/docs/guides/leader-election.html) - The goal of this guide is to cover how to build client-side leader election using Consul.
* [Network Segments](/docs/guides/segments.html) - Configuring Consul to support partial LAN connectivity using Network Segments.
* [Outage Recovery](/docs/guides/outage.html) - This guide covers recovering a cluster that has become unavailable due to server failures.
* [Semaphore](/docs/guides/semaphore.html) - This guide covers using the KV store to implement a semaphore.

View File

@ -0,0 +1,231 @@
---
layout: "docs"
page_title: "Partial LAN Connectivity - Configuring Network Segments"
sidebar_current: "docs-guides-areas"
description: |-
Many advanced Consul users have the need to run clusters with segmented networks, meaning that
not all agents can be in a full mesh. This is usually the result of business policies enforced
via network rules or firewalls. Prior to Consul 0.9.3 this was only possible through federation,
which for some users is too heavyweight or expensive as it requires running multiple servers per
segment.
---
# Partial LAN Connectivity
## Configuring Network Segments
[//]: # ( ~> The network segment functionality described here is available only in )
[//]: # ( [Consul Enterprise](https://www.hashicorp.com/products/consul/) version 0.9.3 and later. )
<%= enterprise_alert :consul %>
Many advanced Consul users have the need to run clusters with segmented networks, meaning that
not all agents can be in a full mesh. This is usually the result of business policies enforced
via network rules or firewalls. Prior to Consul 0.9.3 this was only possible through federation,
which for some users is too heavyweight or expensive as it requires running multiple servers per
segment.
By default, all Consul agents in one datacenter are part of a shared gossip pool over the LAN;
this means that the partial connectivity caused by segmented networks would cause health flapping
as nodes failed to communicate. In this guide we will cover the Network Segments feature, added
in [Consul Enterprise](https://www.hashicorp.com/products/consul/) version 0.9.3, which allows users
to configure Consul to support this kind of segmented network topology.
This guide will cover the basic configuration for setting up multiple segments, as well as
how to configure a prepared query to limit service discovery to the services in the local agent's
network segment.
## Network Segments
All Consul agents are part of the default network segment, `""`, unless a segment is specified in
their configuration. In a standard cluster setup all agents will normally be part of this default
segment and as a result, part of one shared LAN gossip pool. Network segments can be used to break
up the LAN gossip pool into multiple isolated smaller pools by specifying the configuration for segments
on the servers. Each desired segment must be given a name and port, as well as optionally a custom
bind and advertise address for that segment's gossip listener to bind to on the server.
A few things to note:
1. Servers will be a part of all segments they have been configured with. They are the common point
linking the different segments together.
2. Client agents can only be part of one segment at a given time, specified by the [`-segment`]
(/docs/agent/options.html#_segment) option.
3. Clients can only join agents in the same segment as them. If they attempt to join a client in
another segment, or the listening port of another segment on a server, they will get a segment mismatch error.
Once the servers have been configured with the correct segment info, the clients only need to specify
their own segment in the [Agent Config](/docs/agent/options.html) and join by connecting to another
agent within the same segment. If joining to a Consul server, client will need to provide the server's
port for their segment along with the address of the server when performing the join (for example,
`consul agent -retry-join "consul.domain.internal:1234"`).
## Getting Started
To get started, follow the [bootstrapping guide](/docs/guides/bootstrapping.html) to
start a server or group of servers, with the following section added to the configuration (you may need to
adjust the bind/advertise addresses for your setup):
```json
{
...
"segments": [
{"name": "alpha", "bind": "{{GetPrivateIP}}", "advertise": "{{GetPrivateIP}}", "port": 8303},
{"name": "beta", "bind": "{{GetPrivateIP}}", "advertise": "{{GetPrivateIP}}", "port": 8304}
]
}
```
You should see a log message on the servers for each segment's listener as the agent starts up:
```text
2017/08/30 19:05:13 [INFO] serf: EventMemberJoin: server1.dc1 192.168.0.4
2017/08/30 19:05:13 [INFO] serf: EventMemberJoin: server1 192.168.0.4
2017/08/30 19:05:13 [INFO] consul: Started listener for LAN segment "alpha" on 192.168.0.4:8303
2017/08/30 19:05:13 [INFO] serf: EventMemberJoin: server1 192.168.0.4
2017/08/30 19:05:13 [INFO] consul: Started listener for LAN segment "beta" on 192.168.0.4:8304
2017/08/30 19:05:13 [INFO] serf: EventMemberJoin: server1 192.168.0.4
```
Running `consul members` should show the server as being part of all segments:
```text
(server1) $ consul members
Node Address Status Type Build Protocol DC Segment
server1 192.168.0.4:8301 alive server 0.9.3+ent 2 dc1 <all>
```
Next, start a client agent in the 'alpha' segment, with `-join` set to the server's segment
address/port for that segment:
```text
(client1) $ consul agent ... -join 192.168.0.4:8303 -node client1 -segment alpha
```
After the join is successful, we should see the client show up by running the `consul members` command
on the server again:
```text
(server1) $ consul members
Node Address Status Type Build Protocol DC Segment
server1 192.168.0.4:8301 alive server 0.9.3+ent 2 dc1 <all>
client1 192.168.0.5:8301 alive client 0.9.3+ent 2 dc1 alpha
```
Now join another client in segment 'beta' and run the `consul members` command another time:
```text
(client2) $ consul agent ... -join 192.168.0.4:8304 -node client2 -segment beta
```
```text
(server1) $ consul members
Node Address Status Type Build Protocol DC Segment
server1 192.168.0.4:8301 alive server 0.9.3+ent 2 dc1 <all>
client1 192.168.0.5:8301 alive client 0.9.3+ent 2 dc1 alpha
client2 192.168.0.6:8301 alive client 0.9.3+ent 2 dc1 beta
```
If we pass the `-segment` flag when running `consul members`, we can limit the view to agents
in a specific segment:
```text
(server1) $ consul members -segment alpha
Node Address Status Type Build Protocol DC Segment
client1 192.168.0.5:8301 alive client 0.9.3+ent 2 dc1 alpha
server1 192.168.0.4:8303 alive server 0.9.3+ent 2 dc1 alpha
```
Using the `consul catalog nodes` command, we can filter on an internal metadata key,
`consul-network-segment`, which stores the network segment of the node:
```text
(server1) $ consul catalog nodes -node-meta consul-network-segment=alpha
Node ID Address DC
client1 4c29819c 192.168.0.5 dc1
```
With this metadata key, we can construct a [Prepared Query](/api/query.html) that can be used
for DNS to return only services within the same network segment as the local agent.
First, register a service on each of the client nodes:
```text
(client1) $ curl \
--request PUT \
--data '{"Name": "redis", "Port": 8000}' \
localhost:8500/v1/agent/service/register
```
```text
(client2) $ curl \
--request PUT \
--data '{"Name": "redis", "Port": 9000}' \
localhost:8500/v1/agent/service/register
```
Next, write the following to `query.json` and create the query using the HTTP endpoint:
```text
(server1) $ curl \
--request POST \
--data \
'{
"Name": "",
"Template": {
"Type": "name_prefix_match"
},
"Service": {
"Service": "${name.full}",
"NodeMeta": {"consul-network-segment": "${agent.segment}"}
}
}' localhost:8500/v1/query
{"ID":"6f49dd24-de9b-0b6c-fd29-525eca069419"}
```
Now, we can replace any dns lookups of the form `<service>.service.consul` with
`<service>.query.consul` to look up only services within the same network segment:
**Client 1:**
```text
(client1) $ dig @127.0.0.1 -p 8600 redis.query.consul SRV
; <<>> DiG 9.8.3-P1 <<>> @127.0.0.1 -p 8600 redis.query.consul SRV
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 3149
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
;; WARNING: recursion requested but not available
;; QUESTION SECTION:
;redis.query.consul. IN SRV
;; ANSWER SECTION:
redis.query.consul. 0 IN SRV 1 1 8000 client1.node.dc1.consul.
;; ADDITIONAL SECTION:
client1.node.dc1.consul. 0 IN A 192.168.0.5
```
**Client 2:**
```text
(client2) $ dig @127.0.0.1 -p 8600 redis.query.consul SRV
; <<>> DiG 9.8.3-P1 <<>> @127.0.0.1 -p 8600 redis.query.consul SRV
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 3149
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
;; WARNING: recursion requested but not available
;; QUESTION SECTION:
;redis.query.consul. IN SRV
;; ANSWER SECTION:
redis.query.consul. 0 IN SRV 1 1 9000 client2.node.dc1.consul.
;; ADDITIONAL SECTION:
client2.node.dc1.consul. 0 IN A 192.168.0.6
```

View File

@ -54,6 +54,9 @@
<li<%= sidebar_current("api-operator-raft") %>>
<a href="/api/operator/raft.html">Raft</a>
</li>
<li<%= sidebar_current("api-operator-segment") %>>
<a href="/api/operator/segment.html">Segment</a>
</li>
</ul>
</li>
<li<%= sidebar_current("api-query") %>>

View File

@ -251,6 +251,9 @@
<li<%= sidebar_current("docs-guides-leader") %>>
<a href="/docs/guides/leader-election.html">Leader Election</a>
</li>
<li<%= sidebar_current("docs-guides-segments") %>>
<a href="/docs/guides/segments.html">Network Segments</a>
</li>
<li<%= sidebar_current("docs-guides-outage") %>>
<a href="/docs/guides/outage.html">Outage Recovery</a>
</li>