consul/command/agent/command.go
Frank Schroeder b6c69ebf5d
agent: refactor DNS and HTTP server
* refactor DNS server to be ready for multiple bind addresses
* drop tcpKeepAliveListener since it is default for the HTTP servers
* add startup timeout watcher for HTTP servers identical to DNS server
2017-05-31 00:29:29 +02:00

1048 lines
34 KiB
Go

package agent
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"os/signal"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/command/base"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-checkpoint"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
)
// gracefulTimeout controls how long we wait before forcefully terminating
var gracefulTimeout = 5 * time.Second
// validDatacenter is used to validate a datacenter
var validDatacenter = regexp.MustCompile("^[a-zA-Z0-9_-]+$")
// Command is a Command implementation that runs a Consul agent.
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
// exit.
type Command struct {
base.Command
Revision string
Version string
VersionPrerelease string
HumanVersion string
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
}
// readConfig is responsible for setup of our configuration using
// the command line and any file configs
func (cmd *Command) readConfig() *Config {
var cmdCfg Config
var cfgFiles []string
var retryInterval string
var retryIntervalWan string
var dnsRecursors []string
var dev bool
var nodeMeta []string
f := cmd.Command.NewFlagSet(cmd)
f.Var((*AppendSliceValue)(&cfgFiles), "config-file",
"Path to a JSON file to read configuration from. This can be specified multiple times.")
f.Var((*AppendSliceValue)(&cfgFiles), "config-dir",
"Path to a directory to read configuration files from. This will read every file ending "+
"in '.json' as configuration in this directory in alphabetical order. This can be "+
"specified multiple times.")
f.Var((*AppendSliceValue)(&dnsRecursors), "recursor",
"Address of an upstream DNS server. Can be specified multiple times.")
f.Var((*AppendSliceValue)(&nodeMeta), "node-meta",
"An arbitrary metadata key/value pair for this node, of the format `key:value`. Can be specified multiple times.")
f.BoolVar(&dev, "dev", false, "Starts the agent in development mode.")
f.StringVar(&cmdCfg.LogLevel, "log-level", "", "Log level of the agent.")
f.StringVar(&cmdCfg.NodeName, "node", "", "Name of this node. Must be unique in the cluster.")
f.StringVar((*string)(&cmdCfg.NodeID), "node-id", "",
"A unique ID for this node across space and time. Defaults to a randomly-generated ID"+
" that persists in the data-dir.")
f.BoolVar(&cmdCfg.DisableHostNodeID, "disable-host-node-id", false,
"Setting this to true will prevent Consul from using information from the"+
" host to generate a node ID, and will cause Consul to generate a"+
" random node ID instead.")
f.StringVar(&cmdCfg.Datacenter, "datacenter", "", "Datacenter of the agent.")
f.StringVar(&cmdCfg.DataDir, "data-dir", "", "Path to a data directory to store agent state.")
f.BoolVar(&cmdCfg.EnableUI, "ui", false, "Enables the built-in static web UI server.")
f.StringVar(&cmdCfg.UIDir, "ui-dir", "", "Path to directory containing the web UI resources.")
f.StringVar(&cmdCfg.PidFile, "pid-file", "", "Path to file to store agent PID.")
f.StringVar(&cmdCfg.EncryptKey, "encrypt", "", "Provides the gossip encryption key.")
f.BoolVar(&cmdCfg.Server, "server", false, "Switches agent to server mode.")
f.BoolVar(&cmdCfg.NonVotingServer, "non-voting-server", false,
"(Enterprise-only) This flag is used to make the server not participate in the Raft quorum, "+
"and have it only receive the data replication stream. This can be used to add read scalability "+
"to a cluster in cases where a high volume of reads to servers are needed.")
f.BoolVar(&cmdCfg.Bootstrap, "bootstrap", false, "Sets server to bootstrap mode.")
f.IntVar(&cmdCfg.BootstrapExpect, "bootstrap-expect", 0, "Sets server to expect bootstrap mode.")
f.StringVar(&cmdCfg.Domain, "domain", "", "Domain to use for DNS interface.")
f.StringVar(&cmdCfg.ClientAddr, "client", "",
"Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured).")
f.StringVar(&cmdCfg.BindAddr, "bind", "", "Sets the bind address for cluster communication.")
f.StringVar(&cmdCfg.SerfWanBindAddr, "serf-wan-bind", "", "Address to bind Serf WAN listeners to.")
f.StringVar(&cmdCfg.SerfLanBindAddr, "serf-lan-bind", "", "Address to bind Serf LAN listeners to.")
f.IntVar(&cmdCfg.Ports.HTTP, "http-port", 0, "Sets the HTTP API port to listen on.")
f.IntVar(&cmdCfg.Ports.DNS, "dns-port", 0, "DNS port to use.")
f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.")
f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "",
"Sets address to advertise on WAN instead of -advertise address.")
f.IntVar(&cmdCfg.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.")
f.IntVar(&cmdCfg.RaftProtocol, "raft-protocol", -1,
"Sets the Raft protocol version. Defaults to latest.")
f.BoolVar(&cmdCfg.EnableSyslog, "syslog", false,
"Enables logging to syslog.")
f.BoolVar(&cmdCfg.RejoinAfterLeave, "rejoin", false,
"Ignores a previous leave and attempts to rejoin the cluster.")
f.Var((*AppendSliceValue)(&cmdCfg.StartJoin), "join",
"Address of an agent to join at start time. Can be specified multiple times.")
f.Var((*AppendSliceValue)(&cmdCfg.StartJoinWan), "join-wan",
"Address of an agent to join -wan at start time. Can be specified multiple times.")
f.Var((*AppendSliceValue)(&cmdCfg.RetryJoin), "retry-join",
"Address of an agent to join at start time with retries enabled. Can be specified multiple times.")
f.IntVar(&cmdCfg.RetryMaxAttempts, "retry-max", 0,
"Maximum number of join attempts. Defaults to 0, which will retry indefinitely.")
f.StringVar(&retryInterval, "retry-interval", "",
"Time to wait between join attempts.")
f.StringVar(&cmdCfg.RetryJoinEC2.Region, "retry-join-ec2-region", "",
"EC2 Region to discover servers in.")
f.StringVar(&cmdCfg.RetryJoinEC2.TagKey, "retry-join-ec2-tag-key", "",
"EC2 tag key to filter on for server discovery.")
f.StringVar(&cmdCfg.RetryJoinEC2.TagValue, "retry-join-ec2-tag-value", "",
"EC2 tag value to filter on for server discovery.")
f.StringVar(&cmdCfg.RetryJoinGCE.ProjectName, "retry-join-gce-project-name", "",
"Google Compute Engine project to discover servers in.")
f.StringVar(&cmdCfg.RetryJoinGCE.ZonePattern, "retry-join-gce-zone-pattern", "",
"Google Compute Engine region or zone to discover servers in (regex pattern).")
f.StringVar(&cmdCfg.RetryJoinGCE.TagValue, "retry-join-gce-tag-value", "",
"Google Compute Engine tag value to filter on for server discovery.")
f.StringVar(&cmdCfg.RetryJoinGCE.CredentialsFile, "retry-join-gce-credentials-file", "",
"Path to credentials JSON file to use with Google Compute Engine.")
f.StringVar(&cmdCfg.RetryJoinAzure.TagName, "retry-join-azure-tag-name", "",
"Azure tag name to filter on for server discovery.")
f.StringVar(&cmdCfg.RetryJoinAzure.TagValue, "retry-join-azure-tag-value", "",
"Azure tag value to filter on for server discovery.")
f.Var((*AppendSliceValue)(&cmdCfg.RetryJoinWan), "retry-join-wan",
"Address of an agent to join -wan at start time with retries enabled. "+
"Can be specified multiple times.")
f.IntVar(&cmdCfg.RetryMaxAttemptsWan, "retry-max-wan", 0,
"Maximum number of join -wan attempts. Defaults to 0, which will retry indefinitely.")
f.StringVar(&retryIntervalWan, "retry-interval-wan", "",
"Time to wait between join -wan attempts.")
// deprecated flags
var dcDeprecated string
var atlasJoin bool
var atlasInfrastructure, atlasToken, atlasEndpoint string
f.StringVar(&dcDeprecated, "dc", "",
"(deprecated) Datacenter of the agent (use 'datacenter' instead).")
f.StringVar(&atlasInfrastructure, "atlas", "",
"(deprecated) Sets the Atlas infrastructure name, enables SCADA.")
f.StringVar(&atlasToken, "atlas-token", "",
"(deprecated) Provides the Atlas API token.")
f.BoolVar(&atlasJoin, "atlas-join", false,
"(deprecated) Enables auto-joining the Atlas cluster.")
f.StringVar(&atlasEndpoint, "atlas-endpoint", "",
"(deprecated) The address of the endpoint for Atlas integration.")
if err := cmd.Command.Parse(cmd.args); err != nil {
return nil
}
// check deprecated flags
if atlasInfrastructure != "" {
cmd.UI.Warn("WARNING: 'atlas' is deprecated")
}
if atlasToken != "" {
cmd.UI.Warn("WARNING: 'atlas-token' is deprecated")
}
if atlasJoin {
cmd.UI.Warn("WARNING: 'atlas-join' is deprecated")
}
if atlasEndpoint != "" {
cmd.UI.Warn("WARNING: 'atlas-endpoint' is deprecated")
}
if dcDeprecated != "" && cmdCfg.Datacenter == "" {
cmd.UI.Warn("WARNING: 'dc' is deprecated. Use 'datacenter' instead")
cmdCfg.Datacenter = dcDeprecated
}
if retryInterval != "" {
dur, err := time.ParseDuration(retryInterval)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdCfg.RetryInterval = dur
}
if retryIntervalWan != "" {
dur, err := time.ParseDuration(retryIntervalWan)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdCfg.RetryIntervalWan = dur
}
if len(nodeMeta) > 0 {
cmdCfg.Meta = make(map[string]string)
for _, entry := range nodeMeta {
key, value := parseMetaPair(entry)
cmdCfg.Meta[key] = value
}
}
cfg := DefaultConfig()
if dev {
cfg = DevConfig()
}
if len(cfgFiles) > 0 {
fileConfig, err := ReadConfigPaths(cfgFiles)
if err != nil {
cmd.UI.Error(err.Error())
return nil
}
cfg = MergeConfig(cfg, fileConfig)
}
cmdCfg.DNSRecursors = append(cmdCfg.DNSRecursors, dnsRecursors...)
cfg = MergeConfig(cfg, &cmdCfg)
if cfg.NodeName == "" {
hostname, err := os.Hostname()
if err != nil {
cmd.UI.Error(fmt.Sprintf("Error determining node name: %s", err))
return nil
}
cfg.NodeName = hostname
}
cfg.NodeName = strings.TrimSpace(cfg.NodeName)
if cfg.NodeName == "" {
cmd.UI.Error("Node name can not be empty")
return nil
}
// Make sure LeaveOnTerm and SkipLeaveOnInt are set to the right
// defaults based on the agent's mode (client or server).
if cfg.LeaveOnTerm == nil {
cfg.LeaveOnTerm = Bool(!cfg.Server)
}
if cfg.SkipLeaveOnInt == nil {
cfg.SkipLeaveOnInt = Bool(cfg.Server)
}
// Ensure we have a data directory if we are not in dev mode.
if !dev {
if cfg.DataDir == "" {
cmd.UI.Error("Must specify data directory using -data-dir")
return nil
}
if finfo, err := os.Stat(cfg.DataDir); err != nil {
if !os.IsNotExist(err) {
cmd.UI.Error(fmt.Sprintf("Error getting data-dir: %s", err))
return nil
}
} else if !finfo.IsDir() {
cmd.UI.Error(fmt.Sprintf("The data-dir specified at %q is not a directory", cfg.DataDir))
return nil
}
}
// Ensure all endpoints are unique
if err := cfg.verifyUniqueListeners(); err != nil {
cmd.UI.Error(fmt.Sprintf("All listening endpoints must be unique: %s", err))
return nil
}
// Check the data dir for signs of an un-migrated Consul 0.5.x or older
// server. Consul refuses to start if this is present to protect a server
// with existing data from starting on a fresh data set.
if cfg.Server {
mdbPath := filepath.Join(cfg.DataDir, "mdb")
if _, err := os.Stat(mdbPath); !os.IsNotExist(err) {
if os.IsPermission(err) {
cmd.UI.Error(fmt.Sprintf("CRITICAL: Permission denied for data folder at %q!", mdbPath))
cmd.UI.Error("Consul will refuse to boot without access to this directory.")
cmd.UI.Error("Please correct permissions and try starting again.")
return nil
}
cmd.UI.Error(fmt.Sprintf("CRITICAL: Deprecated data folder found at %q!", mdbPath))
cmd.UI.Error("Consul will refuse to boot with this directory present.")
cmd.UI.Error("See https://www.consul.io/docs/upgrade-specific.html for more information.")
return nil
}
}
// Verify DNS settings
if cfg.DNSConfig.UDPAnswerLimit < 1 {
cmd.UI.Error(fmt.Sprintf("dns_config.udp_answer_limit %d too low, must always be greater than zero", cfg.DNSConfig.UDPAnswerLimit))
}
if cfg.EncryptKey != "" {
if _, err := cfg.EncryptBytes(); err != nil {
cmd.UI.Error(fmt.Sprintf("Invalid encryption key: %s", err))
return nil
}
keyfileLAN := filepath.Join(cfg.DataDir, serfLANKeyring)
if _, err := os.Stat(keyfileLAN); err == nil {
cmd.UI.Error("WARNING: LAN keyring exists but -encrypt given, using keyring")
}
if cfg.Server {
keyfileWAN := filepath.Join(cfg.DataDir, serfWANKeyring)
if _, err := os.Stat(keyfileWAN); err == nil {
cmd.UI.Error("WARNING: WAN keyring exists but -encrypt given, using keyring")
}
}
}
// Ensure the datacenter is always lowercased. The DNS endpoints automatically
// lowercase all queries, and internally we expect DC1 and dc1 to be the same.
cfg.Datacenter = strings.ToLower(cfg.Datacenter)
// Verify datacenter is valid
if !validDatacenter.MatchString(cfg.Datacenter) {
cmd.UI.Error("Datacenter must be alpha-numeric with underscores and hypens only")
return nil
}
// If 'acl_datacenter' is set, ensure it is lowercased.
if cfg.ACLDatacenter != "" {
cfg.ACLDatacenter = strings.ToLower(cfg.ACLDatacenter)
// Verify 'acl_datacenter' is valid
if !validDatacenter.MatchString(cfg.ACLDatacenter) {
cmd.UI.Error("ACL datacenter must be alpha-numeric with underscores and hypens only")
return nil
}
}
// Only allow bootstrap mode when acting as a server
if cfg.Bootstrap && !cfg.Server {
cmd.UI.Error("Bootstrap mode cannot be enabled when server mode is not enabled")
return nil
}
// Expect can only work when acting as a server
if cfg.BootstrapExpect != 0 && !cfg.Server {
cmd.UI.Error("Expect mode cannot be enabled when server mode is not enabled")
return nil
}
// Expect can only work when dev mode is off
if cfg.BootstrapExpect > 0 && cfg.DevMode {
cmd.UI.Error("Expect mode cannot be enabled when dev mode is enabled")
return nil
}
// Expect & Bootstrap are mutually exclusive
if cfg.BootstrapExpect != 0 && cfg.Bootstrap {
cmd.UI.Error("Bootstrap cannot be provided with an expected server count")
return nil
}
if ipaddr.IsAny(cfg.AdvertiseAddr) {
cmd.UI.Error("Advertise address cannot be " + cfg.AdvertiseAddr)
return nil
}
if ipaddr.IsAny(cfg.AdvertiseAddrWan) {
cmd.UI.Error("Advertise WAN address cannot be " + cfg.AdvertiseAddrWan)
return nil
}
// Compile all the watches
for _, params := range cfg.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}
// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}
// Store the watch plan
cfg.WatchPlans = append(cfg.WatchPlans, wp)
}
// Warn if we are in expect mode
if cfg.BootstrapExpect == 1 {
cmd.UI.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
cfg.BootstrapExpect = 0
cfg.Bootstrap = true
} else if cfg.BootstrapExpect > 0 {
cmd.UI.Error(fmt.Sprintf("WARNING: Expect Mode enabled, expecting %d servers", cfg.BootstrapExpect))
}
// Warn if we are in bootstrap mode
if cfg.Bootstrap {
cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary")
}
// Need both tag key and value for EC2 discovery
if cfg.RetryJoinEC2.TagKey != "" || cfg.RetryJoinEC2.TagValue != "" {
if cfg.RetryJoinEC2.TagKey == "" || cfg.RetryJoinEC2.TagValue == "" {
cmd.UI.Error("tag key and value are both required for EC2 retry-join")
return nil
}
}
// EC2 and GCE discovery are mutually exclusive
if cfg.RetryJoinEC2.TagKey != "" && cfg.RetryJoinEC2.TagValue != "" && cfg.RetryJoinGCE.TagValue != "" {
cmd.UI.Error("EC2 and GCE discovery are mutually exclusive. Please provide one or the other.")
return nil
}
// Verify the node metadata entries are valid
if err := structs.ValidateMetadata(cfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
}
// It doesn't make sense to include both UI options.
if cfg.EnableUI == true && cfg.UIDir != "" {
cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one")
cmd.UI.Error("If trying to use your own web UI resources, use the ui-dir flag")
cmd.UI.Error("If using Consul version 0.7.0 or later, the web UI is included in the binary so use ui to enable it")
return nil
}
// Set the version info
cfg.Revision = cmd.Revision
cfg.Version = cmd.Version
cfg.VersionPrerelease = cmd.VersionPrerelease
return cfg
}
// checkpointResults is used to handler periodic results from our update checker
func (cmd *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to check for updates: %v", err))
return
}
if results.Outdated {
cmd.UI.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, cmd.Version))
}
for _, alert := range results.Alerts {
switch alert.Level {
case "info":
cmd.UI.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
default:
cmd.UI.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
}
}
}
// startupJoin is invoked to handle any joins specified to take place at start time
func (cmd *Command) startupJoin(cfg *Config) error {
if len(cfg.StartJoin) == 0 {
return nil
}
cmd.UI.Output("Joining cluster...")
n, err := cmd.agent.JoinLAN(cfg.StartJoin)
if err != nil {
return err
}
cmd.UI.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time
func (cmd *Command) startupJoinWan(cfg *Config) error {
if len(cfg.StartJoinWan) == 0 {
return nil
}
cmd.UI.Output("Joining -wan cluster...")
n, err := cmd.agent.JoinWAN(cfg.StartJoinWan)
if err != nil {
return err
}
cmd.UI.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
return nil
}
// retryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted.
func (cmd *Command) retryJoin(cfg *Config, errCh chan<- struct{}) {
ec2Enabled := cfg.RetryJoinEC2.TagKey != "" && cfg.RetryJoinEC2.TagValue != ""
gceEnabled := cfg.RetryJoinGCE.TagValue != ""
azureEnabled := cfg.RetryJoinAzure.TagName != "" && cfg.RetryJoinAzure.TagValue != ""
if len(cfg.RetryJoin) == 0 && !ec2Enabled && !gceEnabled && !azureEnabled {
return
}
logger := cmd.agent.logger
logger.Printf("[INFO] agent: Joining cluster...")
attempt := 0
for {
var servers []string
var err error
switch {
case ec2Enabled:
servers, err = cfg.discoverEc2Hosts(logger)
if err != nil {
logger.Printf("[ERROR] agent: Unable to query EC2 instances: %s", err)
}
logger.Printf("[INFO] agent: Discovered %d servers from EC2", len(servers))
case gceEnabled:
servers, err = cfg.discoverGCEHosts(logger)
if err != nil {
logger.Printf("[ERROR] agent: Unable to query GCE instances: %s", err)
}
logger.Printf("[INFO] agent: Discovered %d servers from GCE", len(servers))
case azureEnabled:
servers, err = cfg.discoverAzureHosts(logger)
if err != nil {
logger.Printf("[ERROR] agent: Unable to query Azure instances: %s", err)
}
logger.Printf("[INFO] agent: Discovered %d servers from Azure", len(servers))
}
servers = append(servers, cfg.RetryJoin...)
if len(servers) == 0 {
err = fmt.Errorf("No servers to join")
} else {
n, err := cmd.agent.JoinLAN(servers)
if err == nil {
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
}
attempt++
if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts {
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
cfg.RetryInterval)
time.Sleep(cfg.RetryInterval)
}
}
// retryJoinWan is used to handle retrying a join -wan until it succeeds or all
// retries are exhausted.
func (cmd *Command) retryJoinWan(cfg *Config, errCh chan<- struct{}) {
if len(cfg.RetryJoinWan) == 0 {
return
}
logger := cmd.agent.logger
logger.Printf("[INFO] agent: Joining WAN cluster...")
attempt := 0
for {
n, err := cmd.agent.JoinWAN(cfg.RetryJoinWan)
if err == nil {
logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
return
}
attempt++
if cfg.RetryMaxAttemptsWan > 0 && attempt > cfg.RetryMaxAttemptsWan {
logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err,
cfg.RetryIntervalWan)
time.Sleep(cfg.RetryIntervalWan)
}
}
// gossipEncrypted determines if the consul instance is using symmetric
// encryption keys to protect gossip protocol messages.
func (cmd *Command) gossipEncrypted() bool {
if cmd.agent.config.EncryptKey != "" {
return true
}
server, ok := cmd.agent.delegate.(*consul.Server)
if ok {
return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil
}
client, ok := cmd.agent.delegate.(*consul.Client)
if ok {
return client != nil && client.KeyManagerLAN() != nil
}
panic(fmt.Sprintf("delegate is neither server nor client: %T", cmd.agent.delegate))
}
func (cmd *Command) Run(args []string) int {
cmd.UI = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: cmd.UI,
}
// Parse our configs
cmd.args = args
config := cmd.readConfig()
if config == nil {
return 1
}
// Setup the log outputs
logConfig := &logger.Config{
LogLevel: config.LogLevel,
EnableSyslog: config.EnableSyslog,
SyslogFacility: config.SyslogFacility,
}
logFilter, logGate, logWriter, logOutput, ok := logger.Setup(logConfig, cmd.UI)
if !ok {
return 1
}
cmd.logFilter = logFilter
cmd.logOutput = logOutput
// Setup telemetry
// Aggregate on 10 second intervals for 1 minute. Expose the
// metrics over stderr when there is a SIGUSR1 received.
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)
metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix)
metricsConf.EnableHostname = !config.Telemetry.DisableHostname
// Configure the statsite sink
var fanout metrics.FanoutSink
if config.Telemetry.StatsiteAddr != "" {
sink, err := metrics.NewStatsiteSink(config.Telemetry.StatsiteAddr)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to start statsite sink. Got: %s", err))
return 1
}
fanout = append(fanout, sink)
}
// Configure the statsd sink
if config.Telemetry.StatsdAddr != "" {
sink, err := metrics.NewStatsdSink(config.Telemetry.StatsdAddr)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to start statsd sink. Got: %s", err))
return 1
}
fanout = append(fanout, sink)
}
// Configure the DogStatsd sink
if config.Telemetry.DogStatsdAddr != "" {
var tags []string
if config.Telemetry.DogStatsdTags != nil {
tags = config.Telemetry.DogStatsdTags
}
sink, err := datadog.NewDogStatsdSink(config.Telemetry.DogStatsdAddr, metricsConf.HostName)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to start DogStatsd sink. Got: %s", err))
return 1
}
sink.SetTags(tags)
fanout = append(fanout, sink)
}
if config.Telemetry.CirconusAPIToken != "" || config.Telemetry.CirconusCheckSubmissionURL != "" {
cfg := &circonus.Config{}
cfg.Interval = config.Telemetry.CirconusSubmissionInterval
cfg.CheckManager.API.TokenKey = config.Telemetry.CirconusAPIToken
cfg.CheckManager.API.TokenApp = config.Telemetry.CirconusAPIApp
cfg.CheckManager.API.URL = config.Telemetry.CirconusAPIURL
cfg.CheckManager.Check.SubmissionURL = config.Telemetry.CirconusCheckSubmissionURL
cfg.CheckManager.Check.ID = config.Telemetry.CirconusCheckID
cfg.CheckManager.Check.ForceMetricActivation = config.Telemetry.CirconusCheckForceMetricActivation
cfg.CheckManager.Check.InstanceID = config.Telemetry.CirconusCheckInstanceID
cfg.CheckManager.Check.SearchTag = config.Telemetry.CirconusCheckSearchTag
cfg.CheckManager.Check.DisplayName = config.Telemetry.CirconusCheckDisplayName
cfg.CheckManager.Check.Tags = config.Telemetry.CirconusCheckTags
cfg.CheckManager.Broker.ID = config.Telemetry.CirconusBrokerID
cfg.CheckManager.Broker.SelectTag = config.Telemetry.CirconusBrokerSelectTag
if cfg.CheckManager.Check.DisplayName == "" {
cfg.CheckManager.Check.DisplayName = "Consul"
}
if cfg.CheckManager.API.TokenApp == "" {
cfg.CheckManager.API.TokenApp = "consul"
}
if cfg.CheckManager.Check.SearchTag == "" {
cfg.CheckManager.Check.SearchTag = "service:consul"
}
sink, err := circonus.NewCirconusSink(cfg)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to start Circonus sink. Got: %s", err))
return 1
}
sink.Start()
fanout = append(fanout, sink)
}
// Initialize the global sink
if len(fanout) > 0 {
fanout = append(fanout, inm)
metrics.NewGlobal(metricsConf, fanout)
} else {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}
// Create the agent
cmd.UI.Output("Starting Consul agent...")
agent, err := NewAgent(config)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Error creating agent: %s", err))
return 1
}
agent.LogOutput = logOutput
agent.LogWriter = logWriter
if err := agent.Start(); err != nil {
cmd.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return 1
}
cmd.agent = agent
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, cmd.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
cmd.checkpointResults(checkpoint.Check(updateParams))
}()
}
defer cmd.agent.Shutdown()
// Join startup nodes if specified
if err := cmd.startupJoin(config); err != nil {
cmd.UI.Error(err.Error())
return 1
}
// Join startup nodes if specified
if err := cmd.startupJoinWan(config); err != nil {
cmd.UI.Error(err.Error())
return 1
}
// Get the new client http listener addr
var httpAddr net.Addr
if config.Ports.HTTP != -1 {
httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
} else if config.Ports.HTTPS != -1 {
httpAddr, err = config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
} else if len(config.WatchPlans) > 0 {
cmd.UI.Error("Error: cannot use watches if both HTTP and HTTPS are disabled")
return 1
}
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Register the watches
for _, wp := range config.WatchPlans {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = cmd.logOutput
addr := httpAddr.String()
// If it's a unix socket, prefix with unix:// so the client initializes correctly
if httpAddr.Network() == "unix" {
addr = "unix://" + addr
}
if err := wp.Run(addr); err != nil {
cmd.UI.Error(fmt.Sprintf("Error running watch: %v", err))
}
}(wp)
}
// Figure out if gossip is encrypted
gossipEncrypted := cmd.agent.delegate.Encrypted()
// Let the agent know we've finished registration
cmd.agent.StartSync()
cmd.UI.Output("Consul agent running!")
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
cmd.UI.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS))
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
config.Ports.SerfLan, config.Ports.SerfWan))
cmd.UI.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
gossipEncrypted, config.VerifyOutgoing, config.VerifyIncoming))
// Enable log streaming
cmd.UI.Info("")
cmd.UI.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
// Start retry join process
errCh := make(chan struct{})
go cmd.retryJoin(config, errCh)
// Start retry -wan join process
errWanCh := make(chan struct{})
go cmd.retryJoinWan(config, errWanCh)
// Wait for exit
return cmd.handleSignals(config, errCh, errWanCh)
}
// handleSignals blocks until we get an exit-causing signal
func (cmd *Command) handleSignals(cfg *Config, retryJoin <-chan struct{}, retryJoinWan <-chan struct{}) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE)
// Wait for a signal
WAIT:
var sig os.Signal
var reloadErrCh chan error
select {
case s := <-signalCh:
sig = s
case ch := <-cmd.agent.reloadCh:
sig = syscall.SIGHUP
reloadErrCh = ch
case <-cmd.ShutdownCh:
sig = os.Interrupt
case <-retryJoin:
return 1
case <-retryJoinWan:
return 1
case <-cmd.agent.ShutdownCh():
// Agent is already shutdown!
return 0
}
// Skip SIGPIPE signals and skip logging whenever such signal is received as well
if sig == syscall.SIGPIPE {
goto WAIT
}
cmd.UI.Output(fmt.Sprintf("Caught signal: %v", sig))
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
conf, err := cmd.handleReload(cfg)
if conf != nil {
cfg = conf
}
if err != nil {
cmd.UI.Error(err.Error())
}
// Send result back if reload was called via HTTP
if reloadErrCh != nil {
reloadErrCh <- err
}
goto WAIT
}
// Check if we should do a graceful leave
graceful := false
if sig == os.Interrupt && !(*cfg.SkipLeaveOnInt) {
graceful = true
} else if sig == syscall.SIGTERM && (*cfg.LeaveOnTerm) {
graceful = true
}
// Bail fast if not doing a graceful leave
if !graceful {
return 1
}
// Attempt a graceful leave
gracefulCh := make(chan struct{})
cmd.UI.Output("Gracefully shutting down agent...")
go func() {
if err := cmd.agent.Leave(); err != nil {
cmd.UI.Error(fmt.Sprintf("Error: %s", err))
return
}
close(gracefulCh)
}()
// Wait for leave or another signal
select {
case <-signalCh:
return 1
case <-time.After(gracefulTimeout):
return 1
case <-gracefulCh:
return 0
}
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (cmd *Command) handleReload(cfg *Config) (*Config, error) {
cmd.UI.Output("Reloading configuration...")
var errs error
newConf := cmd.readConfig()
if newConf == nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs"))
return cfg, errs
}
// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
if logger.ValidateLevelFilter(minLevel, cmd.logFilter) {
cmd.logFilter.SetMinLevel(minLevel)
} else {
errs = multierror.Append(fmt.Errorf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, cmd.logFilter.Levels))
// Keep the current log level
newConf.LogLevel = cfg.LogLevel
}
// Bulk update the services and checks
cmd.agent.PauseSync()
defer cmd.agent.ResumeSync()
// Snapshot the current state, and restore it afterwards
snap := cmd.agent.snapshotCheckState()
defer cmd.agent.restoreCheckState(snap)
// First unload all checks, services, and metadata. This lets us begin the reload
// with a clean slate.
if err := cmd.agent.unloadServices(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
return nil, errs
}
if err := cmd.agent.unloadChecks(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return nil, errs
}
cmd.agent.unloadMetadata()
// Reload service/check definitions and metadata.
if err := cmd.agent.loadServices(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return nil, errs
}
if err := cmd.agent.loadChecks(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return nil, errs
}
if err := cmd.agent.loadMetadata(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
return nil, errs
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListener(cfg.Addresses.HTTP, cfg.Ports.HTTP)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
for _, wp := range cfg.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func(wp *watch.Plan) {
wp.Handler = makeWatchHandler(cmd.logOutput, wp.Exempt["handler"])
wp.LogOutput = cmd.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err))
}
}(wp)
}
return newConf, errs
}
func (cmd *Command) Synopsis() string {
return "Runs a Consul agent"
}
func (cmd *Command) Help() string {
helpText := `
Usage: consul agent [options]
Starts the Consul agent and runs until an interrupt is received. The
agent represents a single node in a cluster.
` + cmd.Command.Help()
return strings.TrimSpace(helpText)
}
func printJSON(name string, v interface{}) {
fmt.Println(name)
b, err := json.MarshalIndent(v, "", " ")
if err != nil {
fmt.Printf("%#v\n", v)
return
}
fmt.Println(string(b))
}