766 lines
20 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package sprawl
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
retry "github.com/avast/retry-go"
"github.com/mitchellh/copystructure"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/runner"
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/secrets"
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/tfgen"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/testing/deployer/util"
)
// TODO: manage workdir externally without chdir
// Sprawl is the definition of a complete running Consul deployment topology.
type Sprawl struct {
logger hclog.Logger
// set after initial Launch is complete
launchLogger hclog.Logger
runner *runner.Runner
license string
secrets secrets.Store
workdir string
// set during Run
config *topology.Config
topology *topology.Topology
generator *tfgen.Generator
clients map[string]*api.Client // one per cluster
grpcConns map[string]*grpc.ClientConn // one per cluster (when v2 enabled)
grpcConnCancel map[string]func() // one per cluster (when v2 enabled)
}
const (
UpgradeTypeStandard = "standard"
UpgradeTypeAutopilot = "autopilot"
)
// Topology allows access to the topology that defines the resources. Do not
// write to any of these fields.
func (s *Sprawl) Topology() *topology.Topology {
return s.topology
}
func (s *Sprawl) Config() *topology.Config {
c2, err := copyConfig(s.config)
if err != nil {
panic(err)
}
return c2
}
// ResourceServiceClientForCluster returns a shared common client that defaults
// to using the management token for this cluster.
func (s *Sprawl) ResourceServiceClientForCluster(clusterName string) pbresource.ResourceServiceClient {
return pbresource.NewResourceServiceClient(s.grpcConns[clusterName])
}
func (s *Sprawl) HTTPClientForCluster(clusterName string) (*http.Client, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("no such cluster: %s", clusterName)
}
// grab the local network for the cluster
network, ok := s.topology.Networks[cluster.NetworkName]
if !ok {
return nil, fmt.Errorf("no such network: %s", cluster.NetworkName)
}
transport, err := util.ProxyHTTPTransport(network.ProxyPort)
if err != nil {
return nil, err
}
return &http.Client{Transport: transport}, nil
}
// LocalAddressForNode returns the local address for the given node in the cluster
func (s *Sprawl) LocalAddressForNode(clusterName string, nid topology.NodeID) (string, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return "", fmt.Errorf("no such cluster: %s", clusterName)
}
node := cluster.NodeByID(nid)
if !node.IsAgent() {
return "", fmt.Errorf("node is not an agent")
}
return node.LocalAddress(), nil
}
// APIClientForNode gets a pooled api.Client connected to the agent running on
// the provided node.
//
// Passing an empty token will assume the bootstrap token. If you want to
// actually use the anonymous token say "-".
func (s *Sprawl) APIClientForNode(clusterName string, nid topology.NodeID, token string) (*api.Client, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("no such cluster: %s", clusterName)
}
nid.Normalize()
node := cluster.NodeByID(nid)
if !node.IsAgent() {
return nil, fmt.Errorf("node is not an agent")
}
switch token {
case "":
token = s.secrets.ReadGeneric(clusterName, secrets.BootstrapToken)
case "-":
token = ""
}
return util.ProxyAPIClient(
node.LocalProxyPort(),
node.LocalAddress(),
8500,
token,
)
}
// APIClientForCluster is a convenience wrapper for APIClientForNode that returns
// an API client for an agent node in the cluster, preferring clients, then servers
func (s *Sprawl) APIClientForCluster(clusterName, token string) (*api.Client, error) {
clu := s.topology.Clusters[clusterName]
// TODO: this always goes to the first client, but we might want to balance this
firstAgent := clu.FirstClient("")
if firstAgent == nil {
firstAgent = clu.FirstServer()
}
if firstAgent == nil {
return nil, fmt.Errorf("failed to find agent in cluster %s", clusterName)
}
return s.APIClientForNode(clusterName, firstAgent.ID(), token)
}
func copyConfig(cfg *topology.Config) (*topology.Config, error) {
dup, err := copystructure.Copy(cfg)
if err != nil {
return nil, err
}
return dup.(*topology.Config), nil
}
// Launch will create the topology defined by the provided configuration and
// bring up all of the relevant clusters. Once created the Stop method must be
// called to destroy everything.
func Launch(
logger hclog.Logger,
workdir string,
cfg *topology.Config,
) (*Sprawl, error) {
if logger == nil {
panic("logger is required")
}
if workdir == "" {
panic("workdir is required")
}
if err := os.MkdirAll(filepath.Join(workdir, "terraform"), 0755); err != nil {
return nil, err
}
runner, err := runner.Load(logger)
if err != nil {
return nil, err
}
// Copy this to avoid leakage.
cfg, err = copyConfig(cfg)
if err != nil {
return nil, err
}
s := &Sprawl{
logger: logger,
runner: runner,
workdir: workdir,
clients: make(map[string]*api.Client),
grpcConns: make(map[string]*grpc.ClientConn),
grpcConnCancel: make(map[string]func()),
}
if err := s.ensureLicense(); err != nil {
return nil, err
}
// Copy this AGAIN, BEFORE compiling so we capture the original definition, without denorms.
s.config, err = copyConfig(cfg)
if err != nil {
return nil, err
}
s.topology, err = topology.Compile(logger.Named("compile"), cfg)
if err != nil {
return nil, fmt.Errorf("topology.Compile: %w", err)
}
s.logger.Debug("compiled topology", "ct", jd(s.topology)) // TODO
start := time.Now()
if err := s.launch(); err != nil {
return nil, err
}
s.logger.Info("topology is ready for use", "elapsed", time.Since(start))
if err := s.PrintDetails(); err != nil {
return nil, fmt.Errorf("error gathering diagnostic details: %w", err)
}
s.launchLogger = s.logger
return s, nil
}
func (s *Sprawl) Relaunch(
cfg *topology.Config,
) error {
return s.RelaunchWithPhase(cfg, LaunchPhaseRegular)
}
// Upgrade upgrades the cluster to the targetImages version
// Parameters:
// - clusterName: the cluster to upgrade
// - upgradeType: the type of upgrade, standard or autopilot
// - targetImages: the target version to upgrade to
// - newServersInTopology: the number of new servers to add to the topology for autopilot upgrade only
// - validationFunc: the validation function to run during upgrade
func (s *Sprawl) Upgrade(
cfg *topology.Config,
clusterName string,
upgradeType string,
targetImages topology.Images,
newServersInTopology []int,
validationFunc func() error,
) error {
cluster := cfg.Cluster(clusterName)
if cluster == nil {
return fmt.Errorf("cluster %s not found in topology", clusterName)
}
leader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get leader: %w", err)
}
s.logger.Info("Upgrade cluster", "cluster", cluster.Name, "type", upgradeType, "leader", leader.Name)
switch upgradeType {
case UpgradeTypeAutopilot:
err = s.autopilotUpgrade(cfg, cluster, newServersInTopology, validationFunc)
case UpgradeTypeStandard:
err = s.standardUpgrade(cluster, targetImages, validationFunc)
default:
err = fmt.Errorf("upgrade type unsupported %s", upgradeType)
}
if err != nil {
return fmt.Errorf("error upgrading cluster: %w", err)
}
s.logger.Info("After upgrade", "server_nodes", cluster.ServerNodes())
return nil
}
// standardUpgrade upgrades server agents in the cluster to the targetImages
// individually
func (s *Sprawl) standardUpgrade(cluster *topology.Cluster,
targetImages topology.Images, validationFunc func() error) error {
upgradeFn := func(nodeID topology.NodeID) error {
cfgUpgrade := s.Config()
clusterCopy := cfgUpgrade.Cluster(cluster.Name)
// update the server node's image
node := clusterCopy.NodeByID(nodeID)
node.Images = targetImages
s.logger.Info("Upgrading", "node", nodeID.Name, "to_version", node.Images)
err := s.RelaunchWithPhase(cfgUpgrade, LaunchPhaseUpgrade)
if err != nil {
return fmt.Errorf("error relaunch for upgrade: %w", err)
}
s.logger.Info("Relaunch completed", "node", node.Name)
return nil
}
s.logger.Info("Upgrade to", "version", targetImages)
// upgrade servers one at a time
for _, node := range cluster.Nodes {
if node.Kind != topology.NodeKindServer {
s.logger.Info("Skip non-server node", "node", node.Name)
continue
}
if err := upgradeFn(node.ID()); err != nil {
return fmt.Errorf("error upgrading node %s: %w", node.Name, err)
}
// run the validation function after upgrading each server agent
if validationFunc != nil {
if err := validationFunc(); err != nil {
return fmt.Errorf("error validating cluster: %w", err)
}
}
}
// upgrade client agents one at a time
for _, node := range cluster.Nodes {
if node.Kind != topology.NodeKindClient {
s.logger.Info("Skip non-client node", "node", node.Name)
continue
}
if err := upgradeFn(node.ID()); err != nil {
return fmt.Errorf("error upgrading node %s: %w", node.Name, err)
}
// run the validation function after upgrading each client agent
if validationFunc != nil {
if err := validationFunc(); err != nil {
return fmt.Errorf("error validating cluster: %w", err)
}
}
}
return nil
}
// autopilotUpgrade upgrades server agents by joining new servers with
// higher version. After upgrade completes, the number of server agents
// are doubled
func (s *Sprawl) autopilotUpgrade(cfg *topology.Config, cluster *topology.Cluster, newServersInTopology []int, validationFunc func() error) error {
leader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get leader: %w", err)
}
// sanity check for autopilot upgrade
if len(newServersInTopology) < len(cluster.ServerNodes()) {
return fmt.Errorf("insufficient new nodes for autopilot upgrade, expect %d, got %d",
len(cluster.ServerNodes()), len(newServersInTopology))
}
for _, nodeIdx := range newServersInTopology {
node := cluster.Nodes[nodeIdx]
if node.Kind != topology.NodeKindServer {
return fmt.Errorf("node %s kind is not server", node.Name)
}
if !node.Disabled {
return fmt.Errorf("node %s is already enabled", node.Name)
}
node.Disabled = false
node.IsNewServer = true
s.logger.Info("Joining new server", "node", node.Name)
}
err = s.RelaunchWithPhase(cfg, LaunchPhaseUpgrade)
if err != nil {
return fmt.Errorf("error relaunch for upgrade: %w", err)
}
s.logger.Info("Relaunch completed for autopilot upgrade")
// Verify leader is transferred - if upgrade type is autopilot
s.logger.Info("Waiting for leader transfer")
time.Sleep(20 * time.Second)
err = retry.Do(
func() error {
newLeader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get new leader: %w", err)
}
s.logger.Info("New leader", "addr", newLeader)
if newLeader.Name == leader.Name {
return fmt.Errorf("waiting for leader transfer")
}
return nil
},
retry.MaxDelay(5*time.Second),
retry.Attempts(20),
)
if err != nil {
return fmt.Errorf("Leader transfer failed: %w", err)
}
// Nodes joined the cluster, so we can set all new servers to false
for _, node := range cluster.Nodes {
node.IsNewServer = false
}
// Run the validation code
if validationFunc != nil {
if err := validationFunc(); err != nil {
return fmt.Errorf("error validating cluster: %w", err)
}
}
return nil
}
// RelaunchWithPhase relaunch the toplogy with the given phase
// and wait for the cluster to be ready (i.e, leadership is established)
func (s *Sprawl) RelaunchWithPhase(
cfg *topology.Config,
launchPhase LaunchPhase,
) error {
// Copy this BEFORE compiling so we capture the original definition, without denorms.
var err error
s.config, err = copyConfig(cfg)
if err != nil {
return err
}
s.logger = s.launchLogger.Named(launchPhase.String())
newTopology, err := topology.Recompile(s.logger.Named("recompile"), cfg, s.topology)
if err != nil {
return fmt.Errorf("topology.Compile: %w", err)
}
s.topology = newTopology
s.logger.Debug("compiled replacement topology", "ct", jd(s.topology)) // TODO
start := time.Now()
if err := s.relaunch(launchPhase); err != nil {
return err
}
s.logger.Info("topology is ready for use", "elapsed", time.Since(start))
if err := s.PrintDetails(); err != nil {
return fmt.Errorf("error gathering diagnostic details: %w", err)
}
return nil
}
// SnapshotSaveAndRestore saves a snapshot of a cluster and then restores the snapshot
func (s *Sprawl) SnapshotSaveAndRestore(clusterName string) error {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return fmt.Errorf("no such cluster: %s", clusterName)
}
var (
client = s.clients[cluster.Name]
)
snapshot := client.Snapshot()
snap, _, err := snapshot.Save(nil)
if err != nil {
return fmt.Errorf("error saving snapshot: %w", err)
}
s.logger.Info("snapshot saved")
time.Sleep(3 * time.Second)
defer snap.Close()
// Restore the snapshot.
if err := snapshot.Restore(nil, snap); err != nil {
return fmt.Errorf("error restoring snapshot: %w", err)
}
s.logger.Info("snapshot restored")
return nil
}
func (s *Sprawl) GetKV(cluster string, key string, queryOpts *api.QueryOptions) ([]byte, error) {
client := s.clients[cluster]
kvClient := client.KV()
data, _, err := kvClient.Get(key, queryOpts)
if err != nil {
return nil, fmt.Errorf("error getting key: %w", err)
}
return data.Value, nil
}
func (s *Sprawl) LoadKVDataToCluster(cluster string, numberOfKeys int, writeOpts *api.WriteOptions) error {
client := s.clients[cluster]
kvClient := client.KV()
for i := 0; i <= numberOfKeys; i++ {
p := &api.KVPair{
Key: fmt.Sprintf("key-%d", i),
}
token := make([]byte, 131072) // 128K size of value
rand.Read(token)
p.Value = token
_, err := kvClient.Put(p, writeOpts)
if err != nil {
return fmt.Errorf("error writing kv: %w", err)
}
}
return nil
}
// Leader returns the cluster leader agent, or an error if no leader is
// available.
func (s *Sprawl) Leader(clusterName string) (*topology.Node, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("no such cluster: %s", clusterName)
}
var (
client = s.clients[cluster.Name]
// logger = s.logger.With("cluster", cluster.Name)
)
leaderAddr, err := getLeader(client)
if err != nil {
return nil, err
}
for _, node := range cluster.Nodes {
if !node.IsServer() || node.Disabled {
continue
}
if strings.HasPrefix(leaderAddr, node.LocalAddress()+":") {
return node, nil
}
}
return nil, fmt.Errorf("leader not found")
}
// Followers returns the cluster following servers.
func (s *Sprawl) Followers(clusterName string) ([]*topology.Node, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("no such cluster: %s", clusterName)
}
leaderNode, err := s.Leader(clusterName)
if err != nil {
return nil, fmt.Errorf("could not determine leader: %w", err)
}
var followers []*topology.Node
for _, node := range cluster.Nodes {
if !node.IsServer() || node.Disabled {
continue
}
if node.ID() != leaderNode.ID() {
followers = append(followers, node)
}
}
return followers, nil
}
func (s *Sprawl) DisabledServers(clusterName string) ([]*topology.Node, error) {
cluster, ok := s.topology.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("no such cluster: %s", clusterName)
}
var servers []*topology.Node
for _, node := range cluster.Nodes {
if !node.IsServer() || !node.Disabled {
continue
}
servers = append(servers, node)
}
return servers, nil
}
func (s *Sprawl) StopContainer(ctx context.Context, containerName string) error {
return s.runner.DockerExec(ctx, []string{"stop", containerName}, nil, nil)
}
func (s *Sprawl) SnapshotEnvoy(ctx context.Context) error {
snapDir := filepath.Join(s.workdir, "envoy-snapshots")
if err := os.MkdirAll(snapDir, 0755); err != nil {
return fmt.Errorf("could not create envoy snapshot output dir %s: %w", snapDir, err)
}
targets := map[string]string{
"config_dump.json": "config_dump",
"clusters.json": "clusters?format=json",
"stats.txt": "stats",
"stats_prometheus.txt": "stats/prometheus",
}
var merr error
for _, c := range s.topology.Clusters {
client, err := s.HTTPClientForCluster(c.Name)
if err != nil {
return fmt.Errorf("could not get http client for cluster %q: %w", c.Name, err)
}
for _, n := range c.Nodes {
if n.Disabled {
continue
}
for _, wrk := range n.Workloads {
if wrk.Disabled || wrk.EnvoyAdminPort <= 0 {
continue
}
prefix := fmt.Sprintf("http://%s:%d", n.LocalAddress(), wrk.EnvoyAdminPort)
for fn, target := range targets {
u := prefix + "/" + target
body, err := scrapeURL(client, u)
if err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not scrape %q for %s on %s: %w",
target, wrk.ID.String(), n.ID().String(), err,
))
continue
}
outFn := filepath.Join(snapDir, n.DockerName()+"--"+wrk.ID.TFString()+"."+fn)
if err := os.WriteFile(outFn+".tmp", body, 0644); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not write output %q for %s on %s: %w",
target, wrk.ID.String(), n.ID().String(), err,
))
continue
}
if err := os.Rename(outFn+".tmp", outFn); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not write output %q for %s on %s: %w",
target, wrk.ID.String(), n.ID().String(), err,
))
continue
}
}
}
}
}
return merr
}
func scrapeURL(client *http.Client, url string) ([]byte, error) {
res, err := client.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
}
func (s *Sprawl) CaptureLogs(ctx context.Context) error {
logDir := filepath.Join(s.workdir, "logs")
if err := os.MkdirAll(logDir, 0755); err != nil {
return fmt.Errorf("could not create log output dir %s: %w", logDir, err)
}
containers, err := s.listContainers(ctx)
if err != nil {
return err
}
s.logger.Debug("Capturing logs")
var merr error
for _, container := range containers {
if err := s.dumpContainerLogs(ctx, container, logDir); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not dump logs for container %s: %w", container, err))
}
}
return merr
}
// Dump known containers out of terraform state file.
func (s *Sprawl) listContainers(ctx context.Context) ([]string, error) {
tfdir := filepath.Join(s.workdir, "terraform")
var buf bytes.Buffer
if err := s.runner.TerraformExec(ctx, []string{"state", "list"}, &buf, tfdir); err != nil {
return nil, fmt.Errorf("error listing containers in terraform state file: %w", err)
}
var (
scan = bufio.NewScanner(&buf)
containers []string
)
for scan.Scan() {
line := strings.TrimSpace(scan.Text())
name := strings.TrimPrefix(line, "docker_container.")
if name != line {
containers = append(containers, name)
continue
}
}
if err := scan.Err(); err != nil {
return nil, err
}
return containers, nil
}
func (s *Sprawl) dumpContainerLogs(ctx context.Context, containerName, outputRoot string) error {
path := filepath.Join(outputRoot, containerName+".log")
f, err := os.Create(path + ".tmp")
if err != nil {
return err
}
keep := false
defer func() {
_ = f.Close()
if !keep {
_ = os.Remove(path + ".tmp")
_ = os.Remove(path)
}
}()
err = s.runner.DockerExecWithStderr(
ctx,
[]string{"logs", containerName},
f,
f,
nil,
)
if err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
if err := os.Rename(path+".tmp", path); err != nil {
return err
}
keep = true
return nil
}