mirror of https://github.com/status-im/consul.git
120 lines
3.1 KiB
Go
120 lines
3.1 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package sprawl
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
|
|
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/secrets"
|
|
"github.com/hashicorp/consul/testing/deployer/topology"
|
|
"github.com/hashicorp/consul/testing/deployer/util"
|
|
)
|
|
|
|
func (s *Sprawl) getResourceClient(clusterName string) pbresource.ResourceServiceClient {
|
|
return pbresource.NewResourceServiceClient(s.grpcConns[clusterName])
|
|
}
|
|
|
|
func (s *Sprawl) getManagementTokenContext(ctx context.Context, clusterName string) context.Context {
|
|
mgmtToken := s.secrets.ReadGeneric(clusterName, secrets.BootstrapToken)
|
|
//nolint:staticcheck
|
|
return context.WithValue(ctx, "x-consul-token", mgmtToken)
|
|
}
|
|
|
|
func getLeader(client *api.Client) (string, error) {
|
|
leaderAdd, err := client.Status().Leader()
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not query leader: %w", err)
|
|
}
|
|
if leaderAdd == "" {
|
|
return "", errors.New("no leader available")
|
|
}
|
|
return leaderAdd, nil
|
|
}
|
|
|
|
func (s *Sprawl) waitForLeader(cluster *topology.Cluster) {
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
logger.Info("waiting for cluster to elect leader")
|
|
for {
|
|
leader, err := client.Status().Leader()
|
|
if leader != "" && err == nil {
|
|
logger.Info("cluster has leader", "leader_addr", leader)
|
|
return
|
|
}
|
|
logger.Debug("cluster has no leader yet", "error", err)
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (s *Sprawl) rejoinAllConsulServers() error {
|
|
// Join the servers together.
|
|
for _, cluster := range s.topology.Clusters {
|
|
if err := s.rejoinServers(cluster); err != nil {
|
|
return fmt.Errorf("rejoinServers[%s]: %w", cluster.Name, err)
|
|
}
|
|
s.waitForLeader(cluster)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) rejoinServers(cluster *topology.Cluster) error {
|
|
var (
|
|
// client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
servers := cluster.ServerNodes()
|
|
|
|
var recoveryToken string
|
|
if servers[0].Images.GreaterThanVersion(topology.MinVersionAgentTokenPartition) {
|
|
recoveryToken = s.secrets.ReadGeneric(cluster.Name, secrets.AgentRecovery)
|
|
} else {
|
|
recoveryToken = s.secrets.ReadGeneric(cluster.Name, secrets.BootstrapToken)
|
|
}
|
|
|
|
node0, rest := servers[0], servers[1:]
|
|
client, err := util.ProxyNotPooledAPIClient(
|
|
node0.LocalProxyPort(),
|
|
node0.LocalAddress(),
|
|
8500,
|
|
recoveryToken,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("could not get client for %q: %w", node0.ID(), err)
|
|
}
|
|
|
|
logger.Info("joining servers together",
|
|
"from", node0.ID(),
|
|
"rest", nodeSliceToNodeIDSlice(rest),
|
|
)
|
|
for _, node := range rest {
|
|
for {
|
|
err = client.Agent().Join(node.LocalAddress(), false)
|
|
if err == nil {
|
|
break
|
|
}
|
|
logger.Warn("could not join", "from", node0.ID(), "to", node.ID(), "error", err)
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func nodeSliceToNodeIDSlice(nodes []*topology.Node) []topology.NodeID {
|
|
var out []topology.NodeID
|
|
for _, node := range nodes {
|
|
out = append(out, node.ID())
|
|
}
|
|
return out
|
|
}
|