Add start-wan-join, retry-wan-join and related configuration options and commandline options

This commit is contained in:
Atin Malaviya 2014-11-14 10:02:42 -05:00
parent 2fafac5aa7
commit 5b41170f47
5 changed files with 300 additions and 2 deletions

View File

@ -53,6 +53,7 @@ func (c *Command) readConfig() *Config {
var cmdConfig Config var cmdConfig Config
var configFiles []string var configFiles []string
var retryInterval string var retryInterval string
var retryWanInterval string
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
@ -83,12 +84,20 @@ func (c *Command) readConfig() *Config {
"enable re-joining after a previous leave") "enable re-joining after a previous leave")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join", cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
"address of agent to join on startup") "address of agent to join on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartWanJoin), "join-wan",
"address of agent to join -wan on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join", cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join",
"address of agent to join on startup with retry") "address of agent to join on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0, cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0,
"number of retries for joining") "number of retries for joining")
cmdFlags.StringVar(&retryInterval, "retry-interval", "", cmdFlags.StringVar(&retryInterval, "retry-interval", "",
"interval between join attempts") "interval between join attempts")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryWanJoin), "retry-wan-join",
"address of agent to join -wan on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryWanMaxAttempts, "retry-wan-max", 0,
"number of retries for joining -wan")
cmdFlags.StringVar(&retryWanInterval, "retry-wan-interval", "",
"interval between join -wan attempts")
if err := cmdFlags.Parse(c.args); err != nil { if err := cmdFlags.Parse(c.args); err != nil {
return nil return nil
@ -103,6 +112,15 @@ func (c *Command) readConfig() *Config {
cmdConfig.RetryInterval = dur cmdConfig.RetryInterval = dur
} }
if retryWanInterval != "" {
dur, err := time.ParseDuration(retryWanInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.RetryWanInterval = dur
}
config := DefaultConfig() config := DefaultConfig()
if len(configFiles) > 0 { if len(configFiles) > 0 {
fileConfig, err := ReadConfigPaths(configFiles) fileConfig, err := ReadConfigPaths(configFiles)
@ -369,6 +387,22 @@ func (c *Command) startupJoin(config *Config) error {
return nil return nil
} }
// startupWanJoin is invoked to handle any joins -wan specified to take place at start time
func (c *Command) startupWanJoin(config *Config) error {
if len(config.StartWanJoin) == 0 {
return nil
}
c.Ui.Output("Joining -wan cluster...")
n, err := c.agent.JoinWAN(config.StartWanJoin)
if err != nil {
return err
}
c.Ui.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
return nil
}
// retryJoin is used to handle retrying a join until it succeeds or all // retryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted. // retries are exhausted.
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) { func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
@ -400,6 +434,37 @@ func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
} }
} }
// retryWanJoin is used to handle retrying a join -wan until it succeeds or all
// retries are exhausted.
func (c *Command) retryWanJoin(config *Config, errCh chan<- struct{}) {
if len(config.RetryWanJoin) == 0 {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining WAN cluster...")
attempt := 0
for {
n, err := c.agent.JoinWAN(config.RetryWanJoin)
if err == nil {
logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
return
}
attempt++
if config.RetryWanMaxAttempts > 0 && attempt > config.RetryWanMaxAttempts {
logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err,
config.RetryWanInterval)
time.Sleep(config.RetryWanInterval)
}
}
func (c *Command) Run(args []string) int { func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{ c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ", OutputPrefix: "==> ",
@ -482,6 +547,12 @@ func (c *Command) Run(args []string) int {
return 1 return 1
} }
// Join startup nodes if specified
if err := c.startupWanJoin(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Register the services // Register the services
for _, service := range config.Services { for _, service := range config.Services {
ns := service.NodeService() ns := service.NodeService()
@ -542,12 +613,16 @@ func (c *Command) Run(args []string) int {
errCh := make(chan struct{}) errCh := make(chan struct{})
go c.retryJoin(config, errCh) go c.retryJoin(config, errCh)
// Start retry -wan join process
errWanCh := make(chan struct{})
go c.retryWanJoin(config, errWanCh)
// Wait for exit // Wait for exit
return c.handleSignals(config, errCh) return c.handleSignals(config, errCh, errWanCh)
} }
// handleSignals blocks until we get an exit-causing signal // handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int { func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retryWanJoin <-chan struct{}) int {
signalCh := make(chan os.Signal, 4) signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@ -563,6 +638,8 @@ WAIT:
sig = os.Interrupt sig = os.Interrupt
case <-retryJoin: case <-retryJoin:
return 1 return 1
case <-retryWanJoin:
return 1
case <-c.agent.ShutdownCh(): case <-c.agent.ShutdownCh():
// Agent is already shutdown! // Agent is already shutdown!
return 0 return 0
@ -721,11 +798,18 @@ Options:
-encrypt=key Provides the gossip encryption key -encrypt=key Provides the gossip encryption key
-join=1.2.3.4 Address of an agent to join at start time. -join=1.2.3.4 Address of an agent to join at start time.
Can be specified multiple times. Can be specified multiple times.
-join-wan=1.2.3.4 Address of an agent to join -wan at start time.
Can be specified multiple times.
-retry-join=1.2.3.4 Address of an agent to join at start time with -retry-join=1.2.3.4 Address of an agent to join at start time with
retries enabled. Can be specified multiple times. retries enabled. Can be specified multiple times.
-retry-interval=30s Time to wait between join attempts. -retry-interval=30s Time to wait between join attempts.
-retry-max=0 Maximum number of join attempts. Defaults to 0, which -retry-max=0 Maximum number of join attempts. Defaults to 0, which
will retry indefinitely. will retry indefinitely.
-retry-wan-join=1.2.3.4 Address of an agent to join -wan at start time with
retries enabled. Can be specified multiple times.
-retry-wan-interval=30s Time to wait between join -wan attempts.
-retry-wan-max=0 Maximum number of join -wan attempts. Defaults to 0, which
will retry indefinitely.
-log-level=info Log level of the agent. -log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster -node=hostname Name of this node. Must be unique in the cluster
-protocol=N Sets the protocol version. Defaults to latest. -protocol=N Sets the protocol version. Defaults to latest.

View File

@ -121,3 +121,86 @@ func TestRetryJoinFail(t *testing.T) {
t.Fatalf("bad: %d", code) t.Fatalf("bad: %d", code)
} }
} }
func TestRetryWanJoin(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
conf2 := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
doneCh := make(chan struct{})
shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
<-doneCh
}()
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf(
"%s:%d",
agent.config.BindAddr,
agent.config.Ports.SerfLan)
args := []string{
"-data-dir", tmpDir,
"-node", fmt.Sprintf(`"%s"`, conf2.NodeName),
"-retry-wan-join", serfAddr,
"-retry-interval", "1s",
}
go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
}
close(doneCh)
}()
testutil.WaitForResult(func() (bool, error) {
mem := agent.WANMembers()
if len(mem) != 2 {
return false, fmt.Errorf("bad: %#v", mem)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}
func TestRetryWanJoinFail(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.Ports.SerfWan)
args := []string{
"-data-dir", tmpDir,
"-retry-wan-join", serfAddr,
"-retry-max", "1",
}
if code := cmd.Run(args); code == 0 {
t.Fatalf("bad: %d", code)
}
}

View File

@ -194,6 +194,11 @@ type Config struct {
// addresses, then the agent will error and exit. // addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"` StartJoin []string `mapstructure:"start_join"`
// StartWanJoin is a list of addresses to attempt to join -wan when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartWanJoin []string `mapstructure:"start_wan_join"`
// RetryJoin is a list of addresses to join with retry enabled. // RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"` RetryJoin []string `mapstructure:"retry_join"`
@ -208,6 +213,20 @@ type Config struct {
RetryInterval time.Duration `mapstructure:"-" json:"-"` RetryInterval time.Duration `mapstructure:"-" json:"-"`
RetryIntervalRaw string `mapstructure:"retry_interval"` RetryIntervalRaw string `mapstructure:"retry_interval"`
// RetryWanJoin is a list of addresses to join -wan with retry enabled.
RetryWanJoin []string `mapstructure:"retry_wan_join"`
// RetryWanMaxAttempts specifies the maximum number of times to retry joining a
// -wan host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryWanMaxAttempts int `mapstructure:"retry_wan_max"`
// RetryWanInterval specifies the amount of time to wait in between join
// -wan attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryWanInterval time.Duration `mapstructure:"-" json:"-"`
RetryWanIntervalRaw string `mapstructure:"retry_wan_interval"`
// UiDir is the directory containing the Web UI resources. // UiDir is the directory containing the Web UI resources.
// If provided, the UI endpoints will be enabled. // If provided, the UI endpoints will be enabled.
UiDir string `mapstructure:"ui_dir"` UiDir string `mapstructure:"ui_dir"`
@ -348,6 +367,7 @@ func DefaultConfig() *Config {
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow", ACLDefaultPolicy: "allow",
RetryInterval: 30 * time.Second, RetryInterval: 30 * time.Second,
RetryWanInterval: 30 * time.Second,
} }
} }
@ -505,6 +525,14 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.RetryInterval = dur result.RetryInterval = dur
} }
if raw := result.RetryWanIntervalRaw; raw != "" {
dur, err := time.ParseDuration(raw)
if err != nil {
return nil, fmt.Errorf("RetryWanInterval invalid: %v", err)
}
result.RetryWanInterval = dur
}
// Merge the single recursor // Merge the single recursor
if result.DNSRecursor != "" { if result.DNSRecursor != "" {
result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor) result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor)
@ -750,6 +778,12 @@ func MergeConfig(a, b *Config) *Config {
if b.RetryInterval != 0 { if b.RetryInterval != 0 {
result.RetryInterval = b.RetryInterval result.RetryInterval = b.RetryInterval
} }
if b.RetryWanMaxAttempts != 0 {
result.RetryWanMaxAttempts = b.RetryWanMaxAttempts
}
if b.RetryWanInterval != 0 {
result.RetryWanInterval = b.RetryWanInterval
}
if b.DNSConfig.NodeTTL != 0 { if b.DNSConfig.NodeTTL != 0 {
result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL
} }
@ -816,11 +850,21 @@ func MergeConfig(a, b *Config) *Config {
result.StartJoin = append(result.StartJoin, a.StartJoin...) result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...) result.StartJoin = append(result.StartJoin, b.StartJoin...)
// Copy the start join addresses
result.StartWanJoin = make([]string, 0, len(a.StartWanJoin)+len(b.StartWanJoin))
result.StartWanJoin = append(result.StartWanJoin, a.StartWanJoin...)
result.StartWanJoin = append(result.StartWanJoin, b.StartWanJoin...)
// Copy the retry join addresses // Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin)) result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...) result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...) result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
// Copy the retry join -wan addresses
result.RetryWanJoin = make([]string, 0, len(a.RetryWanJoin)+len(b.RetryWanJoin))
result.RetryWanJoin = append(result.RetryWanJoin, a.RetryWanJoin...)
result.RetryWanJoin = append(result.RetryWanJoin, b.RetryWanJoin...)
return &result return &result
} }

View File

@ -274,6 +274,23 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config) t.Fatalf("bad: %#v", config)
} }
// Start Wan join
input = `{"start_wan_join": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.StartWanJoin) != 2 {
t.Fatalf("bad: %#v", config)
}
if config.StartWanJoin[0] != "1.1.1.1" {
t.Fatalf("bad: %#v", config)
}
if config.StartWanJoin[1] != "2.2.2.2" {
t.Fatalf("bad: %#v", config)
}
// Retry join // Retry join
input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}` input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input))) config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -316,6 +333,48 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config) t.Fatalf("bad: %#v", config)
} }
// Retry WAN join
input = `{"retry_wan_join": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.RetryWanJoin) != 2 {
t.Fatalf("bad: %#v", config)
}
if config.RetryWanJoin[0] != "1.1.1.1" {
t.Fatalf("bad: %#v", config)
}
if config.RetryWanJoin[1] != "2.2.2.2" {
t.Fatalf("bad: %#v", config)
}
// Retry WAN interval
input = `{"retry_wan_interval": "10s"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryWanIntervalRaw != "10s" {
t.Fatalf("bad: %#v", config)
}
if config.RetryWanInterval.String() != "10s" {
t.Fatalf("bad: %#v", config)
}
// Retry WAN Max
input = `{"retry_wan_max": 3}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryWanMaxAttempts != 3 {
t.Fatalf("bad: %#v", config)
}
// UI Dir // UI Dir
input = `{"ui_dir": "/opt/consul-ui"}` input = `{"ui_dir": "/opt/consul-ui"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input))) config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -860,12 +919,16 @@ func TestMergeConfig(t *testing.T) {
Checks: []*CheckDefinition{nil}, Checks: []*CheckDefinition{nil},
Services: []*ServiceDefinition{nil}, Services: []*ServiceDefinition{nil},
StartJoin: []string{"1.1.1.1"}, StartJoin: []string{"1.1.1.1"},
StartWanJoin: []string{"1.1.1.1"},
UiDir: "/opt/consul-ui", UiDir: "/opt/consul-ui",
EnableSyslog: true, EnableSyslog: true,
RejoinAfterLeave: true, RejoinAfterLeave: true,
RetryJoin: []string{"1.1.1.1"}, RetryJoin: []string{"1.1.1.1"},
RetryIntervalRaw: "10s", RetryIntervalRaw: "10s",
RetryInterval: 10 * time.Second, RetryInterval: 10 * time.Second,
RetryWanJoin: []string{"1.1.1.1"},
RetryWanIntervalRaw: "10s",
RetryWanInterval: 10 * time.Second,
CheckUpdateInterval: 8 * time.Minute, CheckUpdateInterval: 8 * time.Minute,
CheckUpdateIntervalRaw: "8m", CheckUpdateIntervalRaw: "8m",
ACLToken: "1234", ACLToken: "1234",

View File

@ -105,6 +105,21 @@ The options below are all specified on the command-line.
with return code 1. By default, this is set to 0, which will continue to with return code 1. By default, this is set to 0, which will continue to
retry the join indefinitely. retry the join indefinitely.
* `-join-wan` - Address of another wan agent to join upon starting up. This can be
specified multiple times to specify multiple agents that are on the WAN to join. If Consul is
unable to join with any of the specified addresses, agent startup will
fail. By default, the agent won't join -wan any nodes when it starts up.
* `-retry-wan-join` - Similar to `retry-join`, but allows retrying a wan join if the first
attempt fails. This is useful for cases where we know the address will become
available eventually.
* `-retry-wan-interval` - Time to wait between join -wan attempts. Defaults to 30s.
* `-retry-wan-max` - The maximum number of join -wan attempts to be made before exiting
with return code 1. By default, this is set to 0, which will continue to
retry the join -wan indefinitely.
* `-log-level` - The level of logging to show after the Consul agent has * `-log-level` - The level of logging to show after the Consul agent has
started. This defaults to "info". The available log levels are "trace", started. This defaults to "info". The available log levels are "trace",
"debug", "info", "warn", "err". This is the log level that will be shown "debug", "info", "warn", "err". This is the log level that will be shown
@ -339,6 +354,12 @@ definitions support being updated during a reload.
* `retry_interval` - Equivalent to the `-retry-interval` command-line flag. * `retry_interval` - Equivalent to the `-retry-interval` command-line flag.
* `retry_wan_join` - Equivalent to the `-retry-wan-join` command-line flag. Takes a list
of addresses to attempt joining to WAN every `retry_wan_interval` until at least one
join -wan works.
* `retry_wan_interval` - Equivalent to the `-retry-wan-interval` command-line flag.
* `server` - Equivalent to the `-server` command-line flag. * `server` - Equivalent to the `-server` command-line flag.
* `server_name` - When give, this overrides the `node_name` for the TLS certificate. * `server_name` - When give, this overrides the `node_name` for the TLS certificate.
@ -353,6 +374,9 @@ definitions support being updated during a reload.
* `start_join` - An array of strings specifying addresses of nodes to * `start_join` - An array of strings specifying addresses of nodes to
join upon startup. join upon startup.
* `start_wan_join` - An array of strings specifying addresses of WAN nodes to
join -wan upon startup.
* `statsd_addr` - This provides the address of a statsd instance. If provided * `statsd_addr` - This provides the address of a statsd instance. If provided
Consul will send various telemetry information to that instance for aggregation. Consul will send various telemetry information to that instance for aggregation.
This can be used to capture various runtime information. This sends UDP packets This can be used to capture various runtime information. This sends UDP packets