mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
agent: support go-discover retry-join for wan
This commit is contained in:
parent
ad82659eed
commit
14ab5c7641
@ -340,8 +340,8 @@ func (a *Agent) Start() error {
|
||||
}
|
||||
|
||||
// start retry join
|
||||
go a.retryJoin()
|
||||
go a.retryJoinWan()
|
||||
go a.retryJoinLAN()
|
||||
go a.retryJoinWAN()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -2,37 +2,88 @@ package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
discover "github.com/hashicorp/go-discover"
|
||||
)
|
||||
|
||||
// RetryJoin is used to handle retrying a join until it succeeds or all
|
||||
func (a *Agent) retryJoinLAN() {
|
||||
r := &retryJoiner{
|
||||
cluster: "LAN",
|
||||
addrs: a.config.RetryJoin,
|
||||
maxAttempts: a.config.RetryMaxAttempts,
|
||||
interval: a.config.RetryInterval,
|
||||
join: a.JoinLAN,
|
||||
logger: a.logger,
|
||||
}
|
||||
if err := r.retryJoin(); err != nil {
|
||||
a.retryJoinCh <- err
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) retryJoinWAN() {
|
||||
r := &retryJoiner{
|
||||
cluster: "WAN",
|
||||
addrs: a.config.RetryJoinWan,
|
||||
maxAttempts: a.config.RetryMaxAttemptsWan,
|
||||
interval: a.config.RetryIntervalWan,
|
||||
join: a.JoinWAN,
|
||||
logger: a.logger,
|
||||
}
|
||||
if err := r.retryJoin(); err != nil {
|
||||
a.retryJoinCh <- err
|
||||
}
|
||||
}
|
||||
|
||||
// retryJoiner is used to handle retrying a join until it succeeds or all
|
||||
// retries are exhausted.
|
||||
func (a *Agent) retryJoin() {
|
||||
cfg := a.config
|
||||
if len(cfg.RetryJoin) == 0 {
|
||||
return
|
||||
type retryJoiner struct {
|
||||
// cluster is the name of the serf cluster, e.g. "LAN" or "WAN".
|
||||
cluster string
|
||||
|
||||
// addrs is the list of servers or go-discover configurations
|
||||
// to join with.
|
||||
addrs []string
|
||||
|
||||
// maxAttempts is the number of join attempts before giving up.
|
||||
maxAttempts int
|
||||
|
||||
// interval is the time between two join attempts.
|
||||
interval time.Duration
|
||||
|
||||
// join adds the discovered or configured servers to the given
|
||||
// serf cluster.
|
||||
join func([]string) (int, error)
|
||||
|
||||
// logger is the agent logger. Log messages should contain the
|
||||
// "agent: " prefix.
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func (r *retryJoiner) retryJoin() error {
|
||||
if len(r.addrs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
disco := discover.Discover{}
|
||||
a.logger.Printf("[INFO] agent: Retry join is supported for: %s", strings.Join(disco.Names(), " "))
|
||||
a.logger.Printf("[INFO] agent: Joining cluster...")
|
||||
r.logger.Printf("[INFO] agent: Retry join %s is supported for: %s", r.cluster, strings.Join(disco.Names(), " "))
|
||||
r.logger.Printf("[INFO] agent: Joining %s cluster...", r.cluster)
|
||||
attempt := 0
|
||||
for {
|
||||
var addrs []string
|
||||
var err error
|
||||
|
||||
for _, addr := range cfg.RetryJoin {
|
||||
for _, addr := range r.addrs {
|
||||
switch {
|
||||
case strings.Contains(addr, "provider="):
|
||||
servers, err := disco.Addrs(addr, a.logger)
|
||||
servers, err := disco.Addrs(addr, r.logger)
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: %s", err)
|
||||
r.logger.Printf("[ERR] agent: Join %s: %s", r.cluster, err)
|
||||
} else {
|
||||
addrs = append(addrs, servers...)
|
||||
a.logger.Printf("[INFO] agent: Discovered servers: %s", strings.Join(servers, " "))
|
||||
r.logger.Printf("[INFO] agent: Discovered %s servers: %s", r.cluster, strings.Join(servers, " "))
|
||||
}
|
||||
|
||||
default:
|
||||
@ -41,10 +92,10 @@ func (a *Agent) retryJoin() {
|
||||
}
|
||||
|
||||
if len(addrs) > 0 {
|
||||
n, err := a.JoinLAN(addrs)
|
||||
n, err := r.join(addrs)
|
||||
if err == nil {
|
||||
a.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
|
||||
return
|
||||
r.logger.Printf("[INFO] agent: Join %s completed. Synced with %d initial agents", r.cluster, n)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,42 +104,11 @@ func (a *Agent) retryJoin() {
|
||||
}
|
||||
|
||||
attempt++
|
||||
if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts {
|
||||
a.retryJoinCh <- fmt.Errorf("agent: max join retry exhausted, exiting")
|
||||
return
|
||||
if r.maxAttempts > 0 && attempt > r.maxAttempts {
|
||||
return fmt.Errorf("agent: max join %s retry exhausted, exiting", r.cluster)
|
||||
}
|
||||
|
||||
a.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, cfg.RetryInterval)
|
||||
time.Sleep(cfg.RetryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
// RetryJoinWan is used to handle retrying a join -wan until it succeeds or all
|
||||
// retries are exhausted.
|
||||
func (a *Agent) retryJoinWan() {
|
||||
cfg := a.config
|
||||
|
||||
if len(cfg.RetryJoinWan) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Printf("[INFO] agent: Joining WAN cluster...")
|
||||
|
||||
attempt := 0
|
||||
for {
|
||||
n, err := a.JoinWAN(cfg.RetryJoinWan)
|
||||
if err == nil {
|
||||
a.logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
|
||||
return
|
||||
}
|
||||
|
||||
attempt++
|
||||
if cfg.RetryMaxAttemptsWan > 0 && attempt > cfg.RetryMaxAttemptsWan {
|
||||
a.retryJoinCh <- fmt.Errorf("agent: max join -wan retry exhausted, exiting")
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err, cfg.RetryIntervalWan)
|
||||
time.Sleep(cfg.RetryIntervalWan)
|
||||
r.logger.Printf("[WARN] agent: Join %s failed: %v, retrying in %v", r.cluster, err, r.interval)
|
||||
time.Sleep(r.interval)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user