Integ test (test/deployer): upgrade test with service mesh (#19658)

* Integ test (test/deployer): upgrade test with service mesh

* license
This commit is contained in:
cskh 2023-11-15 19:32:37 -05:00 committed by GitHub
parent 2591318c82
commit 04a3a3e8d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 692 additions and 25 deletions

View File

@ -402,6 +402,105 @@ jobs:
run: datadog-ci junit upload --service "$GITHUB_REPOSITORY" $TEST_RESULTS_DIR/results.xml
upgrade-integration-test-deployer:
runs-on: ${{ fromJSON(needs.setup.outputs.compute-large ) }}
needs:
- setup
- dev-build
permissions:
id-token: write # NOTE: this permission is explicitly required for Vault auth.
contents: read
strategy:
fail-fast: false
matrix:
consul-version: [ "1.16", "1.17"]
env:
CONSUL_LATEST_VERSION: ${{ matrix.consul-version }}
steps:
- name: Checkout code
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
with:
ref: ${{ inputs.branch }}
# NOTE: This step is specifically needed for ENT. It allows us to access the required private HashiCorp repos.
- name: Setup Git
if: ${{ endsWith(github.repository, '-enterprise') }}
run: git config --global url."https://${{ secrets.ELEVATED_GITHUB_TOKEN }}:@github.com".insteadOf "https://github.com"
- uses: actions/setup-go@fac708d6674e30b6ba41289acaab6d4b75aa0753 # v4.0.1
with:
go-version-file: 'go.mod'
- run: go env
# Get go binary from workspace
- name: fetch binary
uses: actions/download-artifact@9bc31d5ccc31df68ecc42ccf4149144866c47d8a # v3.0.2
with:
name: '${{ env.CONSUL_BINARY_UPLOAD_NAME }}'
path: .
- name: restore mode+x
run: chmod +x consul
- name: Build image
run: make test-deployer-setup
- name: Upgrade Integration Tests
run: |
mkdir -p "${{ env.TEST_RESULTS_DIR }}"
export NOLOGBUFFER=1
cd ./test-integ/upgrade
docker run --rm ${{ env.CONSUL_LATEST_IMAGE_NAME }}:local consul version
go run gotest.tools/gotestsum@v${{env.GOTESTSUM_VERSION}} \
--raw-command \
--format=standard-verbose \
--debug \
--packages="./..." \
-- \
go test \
-tags "${{ env.GOTAGS }}" \
-timeout=60m \
-parallel=2 \
-json \
./... \
--target-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \
--target-version local \
--latest-image docker.mirror.hashicorp.services/${{ env.CONSUL_LATEST_IMAGE_NAME }} \
--latest-version "${{ env.CONSUL_LATEST_VERSION }}"
env:
# this is needed because of incompatibility between RYUK container and GHA
GOTESTSUM_JUNITFILE: ${{ env.TEST_RESULTS_DIR }}/results.xml
GOTESTSUM_FORMAT: standard-verbose
COMPOSE_INTERACTIVE_NO_CLI: 1
# tput complains if this isn't set to something.
TERM: ansi
# NOTE: ENT specific step as we store secrets in Vault.
- name: Authenticate to Vault
if: ${{ endsWith(github.repository, '-enterprise') }}
id: vault-auth
run: vault-auth
# NOTE: ENT specific step as we store secrets in Vault.
- name: Fetch Secrets
if: ${{ endsWith(github.repository, '-enterprise') }}
id: secrets
uses: hashicorp/vault-action@v2.5.0
with:
url: ${{ steps.vault-auth.outputs.addr }}
caCertificate: ${{ steps.vault-auth.outputs.ca_certificate }}
token: ${{ steps.vault-auth.outputs.token }}
secrets: |
kv/data/github/${{ github.repository }}/datadog apikey | DATADOG_API_KEY;
- name: prepare datadog-ci
if: ${{ !endsWith(github.repository, '-enterprise') }}
run: |
curl -L --fail "https://github.com/DataDog/datadog-ci/releases/latest/download/datadog-ci_linux-x64" --output "/usr/local/bin/datadog-ci"
chmod +x /usr/local/bin/datadog-ci
- name: upload coverage
# do not run on forks
if: github.event.pull_request.head.repo.full_name == github.repository
env:
DATADOG_API_KEY: "${{ endsWith(github.repository, '-enterprise') && env.DATADOG_API_KEY || secrets.DATADOG_API_KEY }}"
DD_ENV: ci
run: datadog-ci junit upload --service "$GITHUB_REPOSITORY" $TEST_RESULTS_DIR/results.xml
test-integrations-success:
needs:
- setup

View File

@ -526,7 +526,7 @@ jobs:
-timeout=20m \
-parallel=2 \
-json \
`go list -tags "${{ env.GOTAGS }}" ./... | grep -v peering_commontopo` \
`go list -tags "${{ env.GOTAGS }}" ./... | grep -v peering_commontopo | grep -v upgrade ` \
--target-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \
--target-version local \
--latest-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \

View File

@ -137,6 +137,14 @@ This helper will rig up a `t.Cleanup` handler that will destroy all resources
created during the test. This can be opted-out of by setting the
`SPRAWL_KEEP_RUNNING=1` environment variable before running the tests.
### Upgrade test
We are migrating upgrade tests from consul-container(`/test/integration`) to
this directory using the [testing/deployer framework](../testing/deployer).
The current implementation supports two upgrade strategies: [standard upgrade](https://developer.hashicorp.com/consul/docs/upgrading/instructions/general-process)
and [autopilot upgrade](https://developer.hashicorp.com/consul/tutorials/datacenter-operations/upgrade-automation). The basic test scenario can be found in `./test-integ/upgrade/basic`.
### Test assertions
Typical service mesh tests want to ensure that use of a service from another

View File

@ -55,6 +55,7 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) {
{Network: "dc1"},
},
},
// Static-server
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client1",
@ -181,5 +182,8 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) {
require.NoError(t, sp.Relaunch(cfg))
// Ensure the static-client connected to the new static-server
asserter.HTTPServiceEchoes(t, staticClient, staticClient.Port, "")
asserter.FortioFetch2HeaderEcho(t, staticClient, &topology.Destination{
ID: staticServerSID,
LocalPort: 5000,
})
}

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/stretchr/testify/require"
)
@ -459,7 +460,7 @@ func (s *ac6FailoversSuite) test(t *testing.T, ct *commonTopo) {
cfg := ct.Sprawl.Config()
DisableNode(t, cfg, nearClu.Name, s.nearServerNode)
require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, "failover"))
require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular))
// Clusters for imported services rely on outlier detection for
// failovers, NOT eds_health_status. This means that killing the
// node above does not actually make the envoy cluster UNHEALTHY

View File

@ -0,0 +1,257 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package upgrade
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl"
"github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/test-integ/topoutil"
)
// The commonTopo comprises 3 agent servers and 3 nodes to run workload
// - workload node 1: static-server
// - workload node 2: static-client
// - workload node 3 (disabled initially): static-server
//
// The post upgrade validation enables workload node 3 to test upgraded
// cluster
type commonTopo struct {
Cfg *topology.Config
Sprawl *sprawl.Sprawl
Assert *topoutil.Asserter
StaticServerSID topology.ID
StaticClientSID topology.ID
StaticServerWorkload *topology.Workload
StaticClientWorkload *topology.Workload
// node index of static-server one
StaticServerInstOne int
// node index of static-server two
StaticServerInstTwo int
}
func NewCommonTopo(t *testing.T) *commonTopo {
t.Helper()
return newCommonTopo(t)
}
func newCommonTopo(t *testing.T) *commonTopo {
t.Helper()
ct := &commonTopo{}
staticServerSID := topology.NewID("static-server", "default", "default")
staticClientSID := topology.NewID("static-client", "default", "default")
cfg := &topology.Config{
Images: topology.Images{
// ConsulEnterprise: "hashicorp/consul-enterprise:local",
},
Networks: []*topology.Network{
{Name: "dc1"},
},
Clusters: []*topology.Cluster{
{
Name: "dc1",
Nodes: []*topology.Node{
{
Kind: topology.NodeKindServer,
Images: utils.LatestImages(),
Name: "dc1-server1",
Addresses: []*topology.Address{
{Network: "dc1"},
},
Meta: map[string]string{
"build": "0.0.1",
},
},
{
Kind: topology.NodeKindServer,
Images: utils.LatestImages(),
Name: "dc1-server2",
Addresses: []*topology.Address{
{Network: "dc1"},
},
Meta: map[string]string{
"build": "0.0.1",
},
},
{
Kind: topology.NodeKindServer,
Images: utils.LatestImages(),
Name: "dc1-server3",
Addresses: []*topology.Address{
{Network: "dc1"},
},
Meta: map[string]string{
"build": "0.0.1",
},
},
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client1",
Workloads: []*topology.Workload{
{
ID: staticServerSID,
Image: "docker.mirror.hashicorp.services/fortio/fortio",
Port: 8080,
EnvoyAdminPort: 19000,
CheckTCP: "127.0.0.1:8080",
Command: []string{
"server",
"-http-port", "8080",
"-redirect-port", "-disabled",
},
},
},
},
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client2",
Workloads: []*topology.Workload{
{
ID: staticClientSID,
Image: "docker.mirror.hashicorp.services/fortio/fortio",
Port: 8080,
EnvoyAdminPort: 19000,
CheckTCP: "127.0.0.1:8080",
Command: []string{
"server",
"-http-port", "8080",
"-redirect-port", "-disabled",
},
Upstreams: []*topology.Destination{
{
ID: staticServerSID,
LocalPort: 5000,
},
},
},
},
},
// Client3 for second static-server
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client3",
Disabled: true,
Workloads: []*topology.Workload{
{
ID: staticServerSID,
Image: "docker.mirror.hashicorp.services/fortio/fortio",
Port: 8080,
EnvoyAdminPort: 19000,
CheckTCP: "127.0.0.1:8080",
Command: []string{
"server",
"-http-port", "8080",
"-redirect-port", "-disabled",
},
},
},
},
},
Enterprise: true,
InitialConfigEntries: []api.ConfigEntry{
&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Partition: "default",
Config: map[string]any{
"protocol": "http",
},
},
&api.ServiceConfigEntry{
Kind: api.ServiceDefaults,
Name: "static-server",
Partition: "default",
Namespace: "default",
},
&api.ServiceIntentionsConfigEntry{
Kind: api.ServiceIntentions,
Name: "static-server",
Partition: "default",
Namespace: "default",
Sources: []*api.SourceIntention{
{
Name: "static-client",
Action: api.IntentionActionAllow},
},
},
},
},
},
}
ct.Cfg = cfg
ct.StaticClientSID = staticClientSID
ct.StaticServerSID = staticServerSID
ct.StaticServerInstOne = 3
ct.StaticServerInstTwo = 5
return ct
}
// PostUpgradeValidation - replace the existing static-server with a new
// instance; verify the connection between static-client and the new instance
func (ct *commonTopo) PostUpgradeValidation(t *testing.T) {
t.Helper()
t.Log("Take down old static-server")
cfg := ct.Sprawl.Config()
cluster := cfg.Cluster("dc1")
cluster.Nodes[ct.StaticServerInstOne].Disabled = true // client 1 -- static-server
require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular))
// verify static-server is down
ct.Assert.HTTPStatus(t, ct.StaticServerWorkload, ct.StaticServerWorkload.Port, 504)
// Add a new static-server
t.Log("Add a new static server")
cfg = ct.Sprawl.Config()
cluster = cfg.Cluster("dc1")
cluster.Nodes[ct.StaticServerInstTwo].Disabled = false // client 3 -- new static-server
require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular))
// Ensure the static-client connected to the new static-server
ct.Assert.FortioFetch2HeaderEcho(t, ct.StaticClientWorkload, &topology.Destination{
ID: ct.StaticServerSID,
LocalPort: 5000,
})
}
// calls sprawltest.Launch followed by validating the connection between
// static-client and static-server
func (ct *commonTopo) Launch(t *testing.T) {
t.Helper()
if ct.Sprawl != nil {
t.Fatalf("Launch must only be called once")
}
ct.Sprawl = sprawltest.Launch(t, ct.Cfg)
ct.Assert = topoutil.NewAsserter(ct.Sprawl)
staticServerWorkload := ct.Sprawl.Topology().Clusters["dc1"].WorkloadByID(
topology.NewNodeID("dc1-client1", "default"),
ct.StaticServerSID,
)
ct.Assert.HTTPStatus(t, staticServerWorkload, staticServerWorkload.Port, 200)
staticClientWorkload := ct.Sprawl.Topology().Clusters["dc1"].WorkloadByID(
topology.NewNodeID("dc1-client2", "default"),
ct.StaticClientSID,
)
ct.Assert.FortioFetch2HeaderEcho(t, staticClientWorkload, &topology.Destination{
ID: ct.StaticServerSID,
LocalPort: 5000,
})
ct.StaticServerWorkload = staticServerWorkload
ct.StaticClientWorkload = staticClientWorkload
}

View File

@ -0,0 +1,39 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package upgrade
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl"
)
// Test_Upgrade_Standard_Basic_Agentless tests upgrading the agent servers
// of a class and validate service mesh after upgrade
//
// Refer to common.go for the detail of the topology
func Test_Upgrade_Standard_Basic_Agentless(t *testing.T) {
t.Parallel()
ct := NewCommonTopo(t)
ct.Launch(t)
t.Log("Start standard upgrade ...")
sp := ct.Sprawl
cfg := sp.Config()
require.NoError(t, ct.Sprawl.LoadKVDataToCluster("dc1", 1, &api.WriteOptions{}))
require.NoError(t, sp.Upgrade(cfg, "dc1", sprawl.UpgradeTypeStandard, utils.TargetImages(), nil))
t.Log("Finished standard upgrade ...")
// verify data is not lost
data, err := ct.Sprawl.GetKV("dc1", "key-0", &api.QueryOptions{})
require.NoError(t, err)
require.NotNil(t, data)
ct.PostUpgradeValidation(t)
}

View File

@ -57,6 +57,25 @@ func GetLatestImageName() string {
return LatestImageName
}
func LatestImages() topology.Images {
img := DockerImage(LatestImageName, LatestVersion)
var set topology.Images
if IsEnterprise() {
set.ConsulEnterprise = img
} else {
set.ConsulCE = img
}
// TODO: have a "latest" dataplane image for testing a service mesh
// complete upgrade of data plane
if cdp := os.Getenv("DEPLOYER_CONSUL_DATAPLANE_IMAGE"); cdp != "" {
set.Dataplane = cdp
}
return set
}
func TargetImages() topology.Images {
img := DockerImage(targetImageName, TargetVersion)
@ -67,6 +86,8 @@ func TargetImages() topology.Images {
set.ConsulCE = img
}
// TODO: have a "target" dataplane image for testing a service mesh
// complete upgrade of data plane
if cdp := os.Getenv("DEPLOYER_CONSUL_DATAPLANE_IMAGE"); cdp != "" {
set.Dataplane = cdp
}

View File

@ -3,6 +3,7 @@ module github.com/hashicorp/consul/testing/deployer
go 1.20
require (
github.com/avast/retry-go v3.0.0+incompatible
github.com/google/go-cmp v0.5.9
github.com/hashicorp/consul-server-connection-manager v0.1.4
github.com/hashicorp/consul/api v1.26.1

View File

@ -15,6 +15,8 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

View File

@ -29,15 +29,31 @@ const (
sharedAgentRecoveryToken = "22082b05-05c9-4a0a-b3da-b9685ac1d688"
)
type LaunchPhase int
const (
LaunchPhaseRegular LaunchPhase = iota
LaunchPhaseUpgrade
)
func (lp LaunchPhase) String() string {
phaseStr := ""
switch lp {
case LaunchPhaseRegular:
phaseStr = "regular"
case LaunchPhaseUpgrade:
phaseStr = "upgrade"
}
return phaseStr
}
func (s *Sprawl) launch() error {
return s.launchType(true)
return s.launchType(true, LaunchPhaseRegular)
}
func (s *Sprawl) relaunch() error {
return s.launchType(false)
func (s *Sprawl) relaunch(launchPhase LaunchPhase) error {
return s.launchType(false, launchPhase)
}
func (s *Sprawl) launchType(firstTime bool) (launchErr error) {
func (s *Sprawl) launchType(firstTime bool, launchPhase LaunchPhase) (launchErr error) {
if err := build.DockerImages(s.logger, s.runner, s.topology); err != nil {
return fmt.Errorf("build.DockerImages: %w", err)
}
@ -121,7 +137,7 @@ func (s *Sprawl) launchType(firstTime bool) (launchErr error) {
s.generator.MarkLaunched()
} else {
if err := s.updateExisting(); err != nil {
if err := s.updateExisting(firstTime, launchPhase); err != nil {
return err
}
}
@ -324,10 +340,19 @@ func (s *Sprawl) createFirstTime() error {
return nil
}
func (s *Sprawl) updateExisting() error {
func (s *Sprawl) updateExisting(firstTime bool, launchPhase LaunchPhase) error {
if launchPhase != LaunchPhaseUpgrade {
if err := s.preRegenTasks(); err != nil {
return fmt.Errorf("preRegenTasks: %w", err)
}
} else {
s.logger.Info("Upgrade - skip preRegenTasks")
for _, cluster := range s.topology.Clusters {
if err := s.createAgentTokens(cluster); err != nil {
return fmt.Errorf("createAgentTokens[%s]: %w", cluster.Name, err)
}
}
}
// We save all of the terraform to the end. Some of the containers will
// be a little broken until we can do stuff like register services to
@ -336,7 +361,7 @@ func (s *Sprawl) updateExisting() error {
return fmt.Errorf("generator[relaunch]: %w", err)
}
if err := s.postRegenTasks(); err != nil {
if err := s.postRegenTasks(firstTime); err != nil {
return fmt.Errorf("postRegenTasks: %w", err)
}
@ -378,10 +403,13 @@ func (s *Sprawl) preRegenTasks() error {
return nil
}
func (s *Sprawl) postRegenTasks() error {
func (s *Sprawl) postRegenTasks(firstTime bool) error {
// rejoinAllConsulServers only for firstTime; otherwise all server agents have retry_join
if firstTime {
if err := s.rejoinAllConsulServers(); err != nil {
return err
}
}
for _, cluster := range s.topology.Clusters {
var err error
@ -390,6 +418,9 @@ func (s *Sprawl) postRegenTasks() error {
// Reconfigure the clients to use a management token.
node := cluster.FirstServer()
if node.Disabled {
continue
}
s.clients[cluster.Name], err = util.ProxyAPIClient(
node.LocalProxyPort(),
node.LocalAddress(),

View File

@ -135,7 +135,10 @@ func (g *Generator) generateAgentHCL(node *topology.Node, enableV2 bool) string
})
if node.IsServer() {
// bootstrap_expect is omitted if this node is a new server
if !node.IsNewServer {
b.add("bootstrap_expect", len(cluster.ServerNodes()))
}
// b.add("translate_wan_addrs", true)
b.addBlock("rpc", func() {
b.add("enable_streaming", true)
@ -165,6 +168,25 @@ func (g *Generator) generateAgentHCL(node *topology.Node, enableV2 bool) string
b.add("enabled", true)
})
// b.addBlock("autopilot", func() {
// b.add("upgrade_version_tag", "build")
// })
if node.AutopilotConfig != nil {
b.addBlock("autopilot", func() {
for k, v := range node.AutopilotConfig {
b.add(k, v)
}
})
}
if node.Meta != nil {
b.addBlock("node_meta", func() {
for k, v := range node.Meta {
b.add(k, v)
}
})
}
} else {
if cluster.Enterprise {
b.add("partition", node.Partition)

View File

@ -7,6 +7,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"net/http"
@ -15,6 +16,7 @@ import (
"strings"
"time"
retry "github.com/avast/retry-go"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/go-hclog"
@ -52,6 +54,11 @@ type Sprawl struct {
grpcConnCancel map[string]func() // one per cluster (when v2 enabled)
}
const (
UpgradeTypeStandard = "standard"
UpgradeTypeAutopilot = "autopilot"
)
// Topology allows access to the topology that defines the resources. Do not
// write to any of these fields.
func (s *Sprawl) Topology() *topology.Topology {
@ -222,12 +229,150 @@ func Launch(
func (s *Sprawl) Relaunch(
cfg *topology.Config,
) error {
return s.RelaunchWithPhase(cfg, "")
return s.RelaunchWithPhase(cfg, LaunchPhaseRegular)
}
func (s *Sprawl) Upgrade(
cfg *topology.Config,
clusterName string,
upgradeType string,
targetImages topology.Images,
newServersInTopology []int,
) error {
cluster := cfg.Cluster(clusterName)
if cluster == nil {
return fmt.Errorf("cluster %s not found in topology", clusterName)
}
leader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get leader: %w", err)
}
s.logger.Info("Upgrade cluster", "cluster", cluster.Name, "type", upgradeType, "leader", leader.Name)
switch upgradeType {
case UpgradeTypeAutopilot:
err = s.autopilotUpgrade(cfg, cluster, newServersInTopology)
case UpgradeTypeStandard:
err = s.standardUpgrade(cluster, targetImages)
default:
err = fmt.Errorf("upgrade type unsupported %s", upgradeType)
}
if err != nil {
return fmt.Errorf("error upgrading cluster: %w", err)
}
s.logger.Info("After upgrade", "server_nodes", cluster.ServerNodes())
return nil
}
// standardUpgrade upgrades server agents in the cluster to the targetImages
// individually
func (s *Sprawl) standardUpgrade(cluster *topology.Cluster,
targetImages topology.Images) error {
upgradeFn := func(nodeID topology.NodeID) error {
cfgUpgrade := s.Config()
clusterCopy := cfgUpgrade.Cluster(cluster.Name)
// update the server node's image
node := clusterCopy.NodeByID(nodeID)
node.Images = targetImages
s.logger.Info("Upgrading", "node", nodeID.Name, "to_version", node.Images)
err := s.RelaunchWithPhase(cfgUpgrade, LaunchPhaseUpgrade)
if err != nil {
return fmt.Errorf("error relaunch for upgrade: %w", err)
}
s.logger.Info("Relaunch completed", "node", node.Name)
return nil
}
s.logger.Info("Upgrade to", "version", targetImages)
// upgrade servers one at a time
for _, node := range cluster.Nodes {
if node.Kind != topology.NodeKindServer {
s.logger.Info("Skip non-server node", "node", node.Name)
continue
}
if err := upgradeFn(node.ID()); err != nil {
return fmt.Errorf("error upgrading node %s: %w", node.Name, err)
}
}
return nil
}
// autopilotUpgrade upgrades server agents by joining new servers with
// higher version. After upgrade completes, the number of server agents
// are doubled
func (s *Sprawl) autopilotUpgrade(cfg *topology.Config, cluster *topology.Cluster, newServersInTopology []int) error {
leader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get leader: %w", err)
}
// sanity check for autopilot upgrade
if len(newServersInTopology) < len(cluster.ServerNodes()) {
return fmt.Errorf("insufficient new nodes for autopilot upgrade, expect %d, got %d",
len(cluster.ServerNodes()), len(newServersInTopology))
}
for _, nodeIdx := range newServersInTopology {
node := cluster.Nodes[nodeIdx]
if node.Kind != topology.NodeKindServer {
return fmt.Errorf("node %s kind is not server", node.Name)
}
if !node.Disabled {
return fmt.Errorf("node %s is already enabled", node.Name)
}
node.Disabled = false
node.IsNewServer = true
s.logger.Info("Joining new server", "node", node.Name)
}
err = s.RelaunchWithPhase(cfg, LaunchPhaseUpgrade)
if err != nil {
return fmt.Errorf("error relaunch for upgrade: %w", err)
}
s.logger.Info("Relaunch completed for autopilot upgrade")
// Verify leader is transferred - if upgrade type is autopilot
s.logger.Info("Waiting for leader transfer")
time.Sleep(20 * time.Second)
err = retry.Do(
func() error {
newLeader, err := s.Leader(cluster.Name)
if err != nil {
return fmt.Errorf("error get new leader: %w", err)
}
s.logger.Info("New leader", "addr", newLeader)
if newLeader.Name == leader.Name {
return fmt.Errorf("waiting for leader transfer")
}
return nil
},
retry.MaxDelay(5*time.Second),
retry.Attempts(20),
)
if err != nil {
return fmt.Errorf("Leader transfer failed: %w", err)
}
// Nodes joined the cluster, so we can set all new servers to false
for _, node := range cluster.Nodes {
node.IsNewServer = false
}
return nil
}
func (s *Sprawl) RelaunchWithPhase(
cfg *topology.Config,
phase string,
launchPhase LaunchPhase,
) error {
// Copy this BEFORE compiling so we capture the original definition, without denorms.
var err error
@ -236,9 +381,7 @@ func (s *Sprawl) RelaunchWithPhase(
return err
}
if phase != "" {
s.logger = s.launchLogger.Named(phase)
}
s.logger = s.launchLogger.Named(launchPhase.String())
newTopology, err := topology.Recompile(s.logger.Named("recompile"), cfg, s.topology)
if err != nil {
@ -250,7 +393,7 @@ func (s *Sprawl) RelaunchWithPhase(
s.logger.Debug("compiled replacement topology", "ct", jd(s.topology)) // TODO
start := time.Now()
if err := s.relaunch(); err != nil {
if err := s.relaunch(launchPhase); err != nil {
return err
}
s.logger.Info("topology is ready for use", "elapsed", time.Since(start))
@ -288,6 +431,36 @@ func (s *Sprawl) SnapshotSave(clusterName string) error {
return nil
}
func (s *Sprawl) GetKV(cluster string, key string, queryOpts *api.QueryOptions) ([]byte, error) {
client := s.clients[cluster]
kvClient := client.KV()
data, _, err := kvClient.Get(key, queryOpts)
if err != nil {
return nil, fmt.Errorf("error getting key: %w", err)
}
return data.Value, nil
}
func (s *Sprawl) LoadKVDataToCluster(cluster string, numberOfKeys int, writeOpts *api.WriteOptions) error {
client := s.clients[cluster]
kvClient := client.KV()
for i := 0; i <= numberOfKeys; i++ {
p := &api.KVPair{
Key: fmt.Sprintf("key-%d", i),
}
token := make([]byte, 131072) // 128K size of value
rand.Read(token)
p.Value = token
_, err := kvClient.Put(p, writeOpts)
if err != nil {
return fmt.Errorf("error writing kv: %w", err)
}
}
return nil
}
// Leader returns the cluster leader agent, or an error if no leader is
// available.
func (s *Sprawl) Leader(clusterName string) (*topology.Node, error) {

View File

@ -321,7 +321,7 @@ func (c *Cluster) PartitionQueryOptionsList() []*api.QueryOptions {
func (c *Cluster) ServerNodes() []*Node {
var out []*Node
for _, node := range c.SortedNodes() {
if node.Kind != NodeKindServer || node.Disabled {
if node.Kind != NodeKindServer || node.Disabled || node.IsNewServer {
continue
}
out = append(out, node)
@ -507,6 +507,9 @@ type Node struct {
// computed at topology compile
Index int
// IsNewServer is true if the server joins existing cluster
IsNewServer bool
// generated during network-and-tls
TLSCertPrefix string `json:",omitempty"`
@ -517,6 +520,12 @@ type Node struct {
// ports) and values initialized to zero until terraform creates the pods
// and extracts the exposed port values from output variables.
usedPorts map[int]int // keys are from compile / values are from terraform output vars
// Meta is the node meta added to the node
Meta map[string]string
// AutopilotConfig of the server agent
AutopilotConfig map[string]string
}
func (n *Node) DockerName() string {