From 04a3a3e8d0f8d68bb70546c87fadc39fd718f303 Mon Sep 17 00:00:00 2001 From: cskh Date: Wed, 15 Nov 2023 19:32:37 -0500 Subject: [PATCH] Integ test (test/deployer): upgrade test with service mesh (#19658) * Integ test (test/deployer): upgrade test with service mesh * license --- .../workflows/nightly-test-integrations.yml | 99 +++++++ .github/workflows/test-integrations.yml | 2 +- test-integ/README.md | 8 + test-integ/connect/snapshot_test.go | 6 +- .../peering_commontopo/ac6_failovers_test.go | 3 +- test-integ/upgrade/basic/common.go | 257 ++++++++++++++++++ .../upgrade/basic/upgrade_basic_test.go | 39 +++ .../consul-container/libs/utils/version.go | 21 ++ testing/deployer/go.mod | 1 + testing/deployer/go.sum | 2 + testing/deployer/sprawl/boot.go | 59 +++- .../deployer/sprawl/internal/tfgen/agent.go | 24 +- testing/deployer/sprawl/sprawl.go | 185 ++++++++++++- testing/deployer/topology/topology.go | 11 +- 14 files changed, 692 insertions(+), 25 deletions(-) create mode 100644 test-integ/upgrade/basic/common.go create mode 100644 test-integ/upgrade/basic/upgrade_basic_test.go diff --git a/.github/workflows/nightly-test-integrations.yml b/.github/workflows/nightly-test-integrations.yml index bab962f23e..20754b3529 100644 --- a/.github/workflows/nightly-test-integrations.yml +++ b/.github/workflows/nightly-test-integrations.yml @@ -402,6 +402,105 @@ jobs: run: datadog-ci junit upload --service "$GITHUB_REPOSITORY" $TEST_RESULTS_DIR/results.xml + upgrade-integration-test-deployer: + runs-on: ${{ fromJSON(needs.setup.outputs.compute-large ) }} + needs: + - setup + - dev-build + permissions: + id-token: write # NOTE: this permission is explicitly required for Vault auth. + contents: read + strategy: + fail-fast: false + matrix: + consul-version: [ "1.16", "1.17"] + env: + CONSUL_LATEST_VERSION: ${{ matrix.consul-version }} + steps: + - name: Checkout code + uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 + with: + ref: ${{ inputs.branch }} + # NOTE: This step is specifically needed for ENT. It allows us to access the required private HashiCorp repos. + - name: Setup Git + if: ${{ endsWith(github.repository, '-enterprise') }} + run: git config --global url."https://${{ secrets.ELEVATED_GITHUB_TOKEN }}:@github.com".insteadOf "https://github.com" + - uses: actions/setup-go@fac708d6674e30b6ba41289acaab6d4b75aa0753 # v4.0.1 + with: + go-version-file: 'go.mod' + - run: go env + + # Get go binary from workspace + - name: fetch binary + uses: actions/download-artifact@9bc31d5ccc31df68ecc42ccf4149144866c47d8a # v3.0.2 + with: + name: '${{ env.CONSUL_BINARY_UPLOAD_NAME }}' + path: . + - name: restore mode+x + run: chmod +x consul + - name: Build image + run: make test-deployer-setup + - name: Upgrade Integration Tests + run: | + mkdir -p "${{ env.TEST_RESULTS_DIR }}" + export NOLOGBUFFER=1 + cd ./test-integ/upgrade + docker run --rm ${{ env.CONSUL_LATEST_IMAGE_NAME }}:local consul version + go run gotest.tools/gotestsum@v${{env.GOTESTSUM_VERSION}} \ + --raw-command \ + --format=standard-verbose \ + --debug \ + --packages="./..." \ + -- \ + go test \ + -tags "${{ env.GOTAGS }}" \ + -timeout=60m \ + -parallel=2 \ + -json \ + ./... \ + --target-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \ + --target-version local \ + --latest-image docker.mirror.hashicorp.services/${{ env.CONSUL_LATEST_IMAGE_NAME }} \ + --latest-version "${{ env.CONSUL_LATEST_VERSION }}" + env: + # this is needed because of incompatibility between RYUK container and GHA + GOTESTSUM_JUNITFILE: ${{ env.TEST_RESULTS_DIR }}/results.xml + GOTESTSUM_FORMAT: standard-verbose + COMPOSE_INTERACTIVE_NO_CLI: 1 + # tput complains if this isn't set to something. + TERM: ansi + # NOTE: ENT specific step as we store secrets in Vault. + - name: Authenticate to Vault + if: ${{ endsWith(github.repository, '-enterprise') }} + id: vault-auth + run: vault-auth + + # NOTE: ENT specific step as we store secrets in Vault. + - name: Fetch Secrets + if: ${{ endsWith(github.repository, '-enterprise') }} + id: secrets + uses: hashicorp/vault-action@v2.5.0 + with: + url: ${{ steps.vault-auth.outputs.addr }} + caCertificate: ${{ steps.vault-auth.outputs.ca_certificate }} + token: ${{ steps.vault-auth.outputs.token }} + secrets: | + kv/data/github/${{ github.repository }}/datadog apikey | DATADOG_API_KEY; + + - name: prepare datadog-ci + if: ${{ !endsWith(github.repository, '-enterprise') }} + run: | + curl -L --fail "https://github.com/DataDog/datadog-ci/releases/latest/download/datadog-ci_linux-x64" --output "/usr/local/bin/datadog-ci" + chmod +x /usr/local/bin/datadog-ci + + - name: upload coverage + # do not run on forks + if: github.event.pull_request.head.repo.full_name == github.repository + env: + DATADOG_API_KEY: "${{ endsWith(github.repository, '-enterprise') && env.DATADOG_API_KEY || secrets.DATADOG_API_KEY }}" + DD_ENV: ci + run: datadog-ci junit upload --service "$GITHUB_REPOSITORY" $TEST_RESULTS_DIR/results.xml + test-integrations-success: needs: - setup diff --git a/.github/workflows/test-integrations.yml b/.github/workflows/test-integrations.yml index 980ea3d26c..f3c803e8fe 100644 --- a/.github/workflows/test-integrations.yml +++ b/.github/workflows/test-integrations.yml @@ -526,7 +526,7 @@ jobs: -timeout=20m \ -parallel=2 \ -json \ - `go list -tags "${{ env.GOTAGS }}" ./... | grep -v peering_commontopo` \ + `go list -tags "${{ env.GOTAGS }}" ./... | grep -v peering_commontopo | grep -v upgrade ` \ --target-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \ --target-version local \ --latest-image ${{ env.CONSUL_LATEST_IMAGE_NAME }} \ diff --git a/test-integ/README.md b/test-integ/README.md index f7232dec25..4ed0b92759 100644 --- a/test-integ/README.md +++ b/test-integ/README.md @@ -137,6 +137,14 @@ This helper will rig up a `t.Cleanup` handler that will destroy all resources created during the test. This can be opted-out of by setting the `SPRAWL_KEEP_RUNNING=1` environment variable before running the tests. +### Upgrade test +We are migrating upgrade tests from consul-container(`/test/integration`) to +this directory using the [testing/deployer framework](../testing/deployer). + +The current implementation supports two upgrade strategies: [standard upgrade](https://developer.hashicorp.com/consul/docs/upgrading/instructions/general-process) +and [autopilot upgrade](https://developer.hashicorp.com/consul/tutorials/datacenter-operations/upgrade-automation). The basic test scenario can be found in `./test-integ/upgrade/basic`. + + ### Test assertions Typical service mesh tests want to ensure that use of a service from another diff --git a/test-integ/connect/snapshot_test.go b/test-integ/connect/snapshot_test.go index 48978bac04..1e8d5f18c2 100644 --- a/test-integ/connect/snapshot_test.go +++ b/test-integ/connect/snapshot_test.go @@ -55,6 +55,7 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) { {Network: "dc1"}, }, }, + // Static-server { Kind: topology.NodeKindDataplane, Name: "dc1-client1", @@ -181,5 +182,8 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) { require.NoError(t, sp.Relaunch(cfg)) // Ensure the static-client connected to the new static-server - asserter.HTTPServiceEchoes(t, staticClient, staticClient.Port, "") + asserter.FortioFetch2HeaderEcho(t, staticClient, &topology.Destination{ + ID: staticServerSID, + LocalPort: 5000, + }) } diff --git a/test-integ/peering_commontopo/ac6_failovers_test.go b/test-integ/peering_commontopo/ac6_failovers_test.go index 66155cad2f..ad71cf477b 100644 --- a/test-integ/peering_commontopo/ac6_failovers_test.go +++ b/test-integ/peering_commontopo/ac6_failovers_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl" "github.com/hashicorp/consul/testing/deployer/topology" "github.com/stretchr/testify/require" ) @@ -459,7 +460,7 @@ func (s *ac6FailoversSuite) test(t *testing.T, ct *commonTopo) { cfg := ct.Sprawl.Config() DisableNode(t, cfg, nearClu.Name, s.nearServerNode) - require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, "failover")) + require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular)) // Clusters for imported services rely on outlier detection for // failovers, NOT eds_health_status. This means that killing the // node above does not actually make the envoy cluster UNHEALTHY diff --git a/test-integ/upgrade/basic/common.go b/test-integ/upgrade/basic/common.go new file mode 100644 index 0000000000..a7b44d5adf --- /dev/null +++ b/test-integ/upgrade/basic/common.go @@ -0,0 +1,257 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package upgrade + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl" + "github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest" + "github.com/hashicorp/consul/testing/deployer/topology" + + "github.com/hashicorp/consul/test-integ/topoutil" +) + +// The commonTopo comprises 3 agent servers and 3 nodes to run workload +// - workload node 1: static-server +// - workload node 2: static-client +// - workload node 3 (disabled initially): static-server +// +// The post upgrade validation enables workload node 3 to test upgraded +// cluster +type commonTopo struct { + Cfg *topology.Config + + Sprawl *sprawl.Sprawl + Assert *topoutil.Asserter + + StaticServerSID topology.ID + StaticClientSID topology.ID + + StaticServerWorkload *topology.Workload + StaticClientWorkload *topology.Workload + + // node index of static-server one + StaticServerInstOne int + // node index of static-server two + StaticServerInstTwo int +} + +func NewCommonTopo(t *testing.T) *commonTopo { + t.Helper() + return newCommonTopo(t) +} + +func newCommonTopo(t *testing.T) *commonTopo { + t.Helper() + + ct := &commonTopo{} + staticServerSID := topology.NewID("static-server", "default", "default") + staticClientSID := topology.NewID("static-client", "default", "default") + + cfg := &topology.Config{ + Images: topology.Images{ + // ConsulEnterprise: "hashicorp/consul-enterprise:local", + }, + Networks: []*topology.Network{ + {Name: "dc1"}, + }, + Clusters: []*topology.Cluster{ + { + Name: "dc1", + Nodes: []*topology.Node{ + { + Kind: topology.NodeKindServer, + Images: utils.LatestImages(), + Name: "dc1-server1", + Addresses: []*topology.Address{ + {Network: "dc1"}, + }, + Meta: map[string]string{ + "build": "0.0.1", + }, + }, + { + Kind: topology.NodeKindServer, + Images: utils.LatestImages(), + Name: "dc1-server2", + Addresses: []*topology.Address{ + {Network: "dc1"}, + }, + Meta: map[string]string{ + "build": "0.0.1", + }, + }, + { + Kind: topology.NodeKindServer, + Images: utils.LatestImages(), + Name: "dc1-server3", + Addresses: []*topology.Address{ + {Network: "dc1"}, + }, + Meta: map[string]string{ + "build": "0.0.1", + }, + }, + { + Kind: topology.NodeKindDataplane, + Name: "dc1-client1", + Workloads: []*topology.Workload{ + { + ID: staticServerSID, + Image: "docker.mirror.hashicorp.services/fortio/fortio", + Port: 8080, + EnvoyAdminPort: 19000, + CheckTCP: "127.0.0.1:8080", + Command: []string{ + "server", + "-http-port", "8080", + "-redirect-port", "-disabled", + }, + }, + }, + }, + { + Kind: topology.NodeKindDataplane, + Name: "dc1-client2", + Workloads: []*topology.Workload{ + { + ID: staticClientSID, + Image: "docker.mirror.hashicorp.services/fortio/fortio", + Port: 8080, + EnvoyAdminPort: 19000, + CheckTCP: "127.0.0.1:8080", + Command: []string{ + "server", + "-http-port", "8080", + "-redirect-port", "-disabled", + }, + Upstreams: []*topology.Destination{ + { + ID: staticServerSID, + LocalPort: 5000, + }, + }, + }, + }, + }, + // Client3 for second static-server + { + Kind: topology.NodeKindDataplane, + Name: "dc1-client3", + Disabled: true, + Workloads: []*topology.Workload{ + { + ID: staticServerSID, + Image: "docker.mirror.hashicorp.services/fortio/fortio", + Port: 8080, + EnvoyAdminPort: 19000, + CheckTCP: "127.0.0.1:8080", + Command: []string{ + "server", + "-http-port", "8080", + "-redirect-port", "-disabled", + }, + }, + }, + }, + }, + Enterprise: true, + InitialConfigEntries: []api.ConfigEntry{ + &api.ProxyConfigEntry{ + Kind: api.ProxyDefaults, + Name: "global", + Partition: "default", + Config: map[string]any{ + "protocol": "http", + }, + }, + &api.ServiceConfigEntry{ + Kind: api.ServiceDefaults, + Name: "static-server", + Partition: "default", + Namespace: "default", + }, + &api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: "static-server", + Partition: "default", + Namespace: "default", + Sources: []*api.SourceIntention{ + { + Name: "static-client", + Action: api.IntentionActionAllow}, + }, + }, + }, + }, + }, + } + + ct.Cfg = cfg + ct.StaticClientSID = staticClientSID + ct.StaticServerSID = staticServerSID + + ct.StaticServerInstOne = 3 + ct.StaticServerInstTwo = 5 + return ct +} + +// PostUpgradeValidation - replace the existing static-server with a new +// instance; verify the connection between static-client and the new instance +func (ct *commonTopo) PostUpgradeValidation(t *testing.T) { + t.Helper() + t.Log("Take down old static-server") + cfg := ct.Sprawl.Config() + cluster := cfg.Cluster("dc1") + cluster.Nodes[ct.StaticServerInstOne].Disabled = true // client 1 -- static-server + require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular)) + // verify static-server is down + ct.Assert.HTTPStatus(t, ct.StaticServerWorkload, ct.StaticServerWorkload.Port, 504) + + // Add a new static-server + t.Log("Add a new static server") + cfg = ct.Sprawl.Config() + cluster = cfg.Cluster("dc1") + cluster.Nodes[ct.StaticServerInstTwo].Disabled = false // client 3 -- new static-server + require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, sprawl.LaunchPhaseRegular)) + // Ensure the static-client connected to the new static-server + ct.Assert.FortioFetch2HeaderEcho(t, ct.StaticClientWorkload, &topology.Destination{ + ID: ct.StaticServerSID, + LocalPort: 5000, + }) +} + +// calls sprawltest.Launch followed by validating the connection between +// static-client and static-server +func (ct *commonTopo) Launch(t *testing.T) { + t.Helper() + if ct.Sprawl != nil { + t.Fatalf("Launch must only be called once") + } + ct.Sprawl = sprawltest.Launch(t, ct.Cfg) + ct.Assert = topoutil.NewAsserter(ct.Sprawl) + + staticServerWorkload := ct.Sprawl.Topology().Clusters["dc1"].WorkloadByID( + topology.NewNodeID("dc1-client1", "default"), + ct.StaticServerSID, + ) + ct.Assert.HTTPStatus(t, staticServerWorkload, staticServerWorkload.Port, 200) + + staticClientWorkload := ct.Sprawl.Topology().Clusters["dc1"].WorkloadByID( + topology.NewNodeID("dc1-client2", "default"), + ct.StaticClientSID, + ) + ct.Assert.FortioFetch2HeaderEcho(t, staticClientWorkload, &topology.Destination{ + ID: ct.StaticServerSID, + LocalPort: 5000, + }) + + ct.StaticServerWorkload = staticServerWorkload + ct.StaticClientWorkload = staticClientWorkload +} diff --git a/test-integ/upgrade/basic/upgrade_basic_test.go b/test-integ/upgrade/basic/upgrade_basic_test.go new file mode 100644 index 0000000000..aae804b2ec --- /dev/null +++ b/test-integ/upgrade/basic/upgrade_basic_test.go @@ -0,0 +1,39 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package upgrade + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl" +) + +// Test_Upgrade_Standard_Basic_Agentless tests upgrading the agent servers +// of a class and validate service mesh after upgrade +// +// Refer to common.go for the detail of the topology +func Test_Upgrade_Standard_Basic_Agentless(t *testing.T) { + t.Parallel() + + ct := NewCommonTopo(t) + ct.Launch(t) + + t.Log("Start standard upgrade ...") + sp := ct.Sprawl + cfg := sp.Config() + require.NoError(t, ct.Sprawl.LoadKVDataToCluster("dc1", 1, &api.WriteOptions{})) + require.NoError(t, sp.Upgrade(cfg, "dc1", sprawl.UpgradeTypeStandard, utils.TargetImages(), nil)) + t.Log("Finished standard upgrade ...") + + // verify data is not lost + data, err := ct.Sprawl.GetKV("dc1", "key-0", &api.QueryOptions{}) + require.NoError(t, err) + require.NotNil(t, data) + + ct.PostUpgradeValidation(t) +} diff --git a/test/integration/consul-container/libs/utils/version.go b/test/integration/consul-container/libs/utils/version.go index 1f62306f44..d1adcd1928 100644 --- a/test/integration/consul-container/libs/utils/version.go +++ b/test/integration/consul-container/libs/utils/version.go @@ -57,6 +57,25 @@ func GetLatestImageName() string { return LatestImageName } +func LatestImages() topology.Images { + img := DockerImage(LatestImageName, LatestVersion) + + var set topology.Images + if IsEnterprise() { + set.ConsulEnterprise = img + } else { + set.ConsulCE = img + } + + // TODO: have a "latest" dataplane image for testing a service mesh + // complete upgrade of data plane + if cdp := os.Getenv("DEPLOYER_CONSUL_DATAPLANE_IMAGE"); cdp != "" { + set.Dataplane = cdp + } + + return set +} + func TargetImages() topology.Images { img := DockerImage(targetImageName, TargetVersion) @@ -67,6 +86,8 @@ func TargetImages() topology.Images { set.ConsulCE = img } + // TODO: have a "target" dataplane image for testing a service mesh + // complete upgrade of data plane if cdp := os.Getenv("DEPLOYER_CONSUL_DATAPLANE_IMAGE"); cdp != "" { set.Dataplane = cdp } diff --git a/testing/deployer/go.mod b/testing/deployer/go.mod index c7916a013c..d091194242 100644 --- a/testing/deployer/go.mod +++ b/testing/deployer/go.mod @@ -3,6 +3,7 @@ module github.com/hashicorp/consul/testing/deployer go 1.20 require ( + github.com/avast/retry-go v3.0.0+incompatible github.com/google/go-cmp v0.5.9 github.com/hashicorp/consul-server-connection-manager v0.1.4 github.com/hashicorp/consul/api v1.26.1 diff --git a/testing/deployer/go.sum b/testing/deployer/go.sum index e632e8be79..abb76fa0a7 100644 --- a/testing/deployer/go.sum +++ b/testing/deployer/go.sum @@ -15,6 +15,8 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/testing/deployer/sprawl/boot.go b/testing/deployer/sprawl/boot.go index 6c3344366b..b8b123482e 100644 --- a/testing/deployer/sprawl/boot.go +++ b/testing/deployer/sprawl/boot.go @@ -29,15 +29,31 @@ const ( sharedAgentRecoveryToken = "22082b05-05c9-4a0a-b3da-b9685ac1d688" ) +type LaunchPhase int + +const ( + LaunchPhaseRegular LaunchPhase = iota + LaunchPhaseUpgrade +) + +func (lp LaunchPhase) String() string { + phaseStr := "" + switch lp { + case LaunchPhaseRegular: + phaseStr = "regular" + case LaunchPhaseUpgrade: + phaseStr = "upgrade" + } + return phaseStr +} + func (s *Sprawl) launch() error { - return s.launchType(true) + return s.launchType(true, LaunchPhaseRegular) } - -func (s *Sprawl) relaunch() error { - return s.launchType(false) +func (s *Sprawl) relaunch(launchPhase LaunchPhase) error { + return s.launchType(false, launchPhase) } - -func (s *Sprawl) launchType(firstTime bool) (launchErr error) { +func (s *Sprawl) launchType(firstTime bool, launchPhase LaunchPhase) (launchErr error) { if err := build.DockerImages(s.logger, s.runner, s.topology); err != nil { return fmt.Errorf("build.DockerImages: %w", err) } @@ -121,7 +137,7 @@ func (s *Sprawl) launchType(firstTime bool) (launchErr error) { s.generator.MarkLaunched() } else { - if err := s.updateExisting(); err != nil { + if err := s.updateExisting(firstTime, launchPhase); err != nil { return err } } @@ -324,9 +340,18 @@ func (s *Sprawl) createFirstTime() error { return nil } -func (s *Sprawl) updateExisting() error { - if err := s.preRegenTasks(); err != nil { - return fmt.Errorf("preRegenTasks: %w", err) +func (s *Sprawl) updateExisting(firstTime bool, launchPhase LaunchPhase) error { + if launchPhase != LaunchPhaseUpgrade { + if err := s.preRegenTasks(); err != nil { + return fmt.Errorf("preRegenTasks: %w", err) + } + } else { + s.logger.Info("Upgrade - skip preRegenTasks") + for _, cluster := range s.topology.Clusters { + if err := s.createAgentTokens(cluster); err != nil { + return fmt.Errorf("createAgentTokens[%s]: %w", cluster.Name, err) + } + } } // We save all of the terraform to the end. Some of the containers will @@ -336,7 +361,7 @@ func (s *Sprawl) updateExisting() error { return fmt.Errorf("generator[relaunch]: %w", err) } - if err := s.postRegenTasks(); err != nil { + if err := s.postRegenTasks(firstTime); err != nil { return fmt.Errorf("postRegenTasks: %w", err) } @@ -378,9 +403,12 @@ func (s *Sprawl) preRegenTasks() error { return nil } -func (s *Sprawl) postRegenTasks() error { - if err := s.rejoinAllConsulServers(); err != nil { - return err +func (s *Sprawl) postRegenTasks(firstTime bool) error { + // rejoinAllConsulServers only for firstTime; otherwise all server agents have retry_join + if firstTime { + if err := s.rejoinAllConsulServers(); err != nil { + return err + } } for _, cluster := range s.topology.Clusters { @@ -390,6 +418,9 @@ func (s *Sprawl) postRegenTasks() error { // Reconfigure the clients to use a management token. node := cluster.FirstServer() + if node.Disabled { + continue + } s.clients[cluster.Name], err = util.ProxyAPIClient( node.LocalProxyPort(), node.LocalAddress(), diff --git a/testing/deployer/sprawl/internal/tfgen/agent.go b/testing/deployer/sprawl/internal/tfgen/agent.go index ee77c09a58..36071624bf 100644 --- a/testing/deployer/sprawl/internal/tfgen/agent.go +++ b/testing/deployer/sprawl/internal/tfgen/agent.go @@ -135,7 +135,10 @@ func (g *Generator) generateAgentHCL(node *topology.Node, enableV2 bool) string }) if node.IsServer() { - b.add("bootstrap_expect", len(cluster.ServerNodes())) + // bootstrap_expect is omitted if this node is a new server + if !node.IsNewServer { + b.add("bootstrap_expect", len(cluster.ServerNodes())) + } // b.add("translate_wan_addrs", true) b.addBlock("rpc", func() { b.add("enable_streaming", true) @@ -165,6 +168,25 @@ func (g *Generator) generateAgentHCL(node *topology.Node, enableV2 bool) string b.add("enabled", true) }) + // b.addBlock("autopilot", func() { + // b.add("upgrade_version_tag", "build") + // }) + + if node.AutopilotConfig != nil { + b.addBlock("autopilot", func() { + for k, v := range node.AutopilotConfig { + b.add(k, v) + } + }) + } + + if node.Meta != nil { + b.addBlock("node_meta", func() { + for k, v := range node.Meta { + b.add(k, v) + } + }) + } } else { if cluster.Enterprise { b.add("partition", node.Partition) diff --git a/testing/deployer/sprawl/sprawl.go b/testing/deployer/sprawl/sprawl.go index f8e1d07416..e06c949564 100644 --- a/testing/deployer/sprawl/sprawl.go +++ b/testing/deployer/sprawl/sprawl.go @@ -7,6 +7,7 @@ import ( "bufio" "bytes" "context" + "crypto/rand" "fmt" "io" "net/http" @@ -15,6 +16,7 @@ import ( "strings" "time" + retry "github.com/avast/retry-go" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/go-hclog" @@ -52,6 +54,11 @@ type Sprawl struct { grpcConnCancel map[string]func() // one per cluster (when v2 enabled) } +const ( + UpgradeTypeStandard = "standard" + UpgradeTypeAutopilot = "autopilot" +) + // Topology allows access to the topology that defines the resources. Do not // write to any of these fields. func (s *Sprawl) Topology() *topology.Topology { @@ -222,12 +229,150 @@ func Launch( func (s *Sprawl) Relaunch( cfg *topology.Config, ) error { - return s.RelaunchWithPhase(cfg, "") + return s.RelaunchWithPhase(cfg, LaunchPhaseRegular) +} + +func (s *Sprawl) Upgrade( + cfg *topology.Config, + clusterName string, + upgradeType string, + targetImages topology.Images, + newServersInTopology []int, +) error { + cluster := cfg.Cluster(clusterName) + if cluster == nil { + return fmt.Errorf("cluster %s not found in topology", clusterName) + } + + leader, err := s.Leader(cluster.Name) + if err != nil { + return fmt.Errorf("error get leader: %w", err) + } + s.logger.Info("Upgrade cluster", "cluster", cluster.Name, "type", upgradeType, "leader", leader.Name) + + switch upgradeType { + case UpgradeTypeAutopilot: + err = s.autopilotUpgrade(cfg, cluster, newServersInTopology) + case UpgradeTypeStandard: + err = s.standardUpgrade(cluster, targetImages) + default: + err = fmt.Errorf("upgrade type unsupported %s", upgradeType) + } + if err != nil { + return fmt.Errorf("error upgrading cluster: %w", err) + } + + s.logger.Info("After upgrade", "server_nodes", cluster.ServerNodes()) + return nil +} + +// standardUpgrade upgrades server agents in the cluster to the targetImages +// individually +func (s *Sprawl) standardUpgrade(cluster *topology.Cluster, + targetImages topology.Images) error { + upgradeFn := func(nodeID topology.NodeID) error { + cfgUpgrade := s.Config() + clusterCopy := cfgUpgrade.Cluster(cluster.Name) + + // update the server node's image + node := clusterCopy.NodeByID(nodeID) + node.Images = targetImages + s.logger.Info("Upgrading", "node", nodeID.Name, "to_version", node.Images) + err := s.RelaunchWithPhase(cfgUpgrade, LaunchPhaseUpgrade) + if err != nil { + return fmt.Errorf("error relaunch for upgrade: %w", err) + } + s.logger.Info("Relaunch completed", "node", node.Name) + return nil + } + + s.logger.Info("Upgrade to", "version", targetImages) + + // upgrade servers one at a time + for _, node := range cluster.Nodes { + if node.Kind != topology.NodeKindServer { + s.logger.Info("Skip non-server node", "node", node.Name) + continue + } + if err := upgradeFn(node.ID()); err != nil { + return fmt.Errorf("error upgrading node %s: %w", node.Name, err) + } + } + return nil +} + +// autopilotUpgrade upgrades server agents by joining new servers with +// higher version. After upgrade completes, the number of server agents +// are doubled +func (s *Sprawl) autopilotUpgrade(cfg *topology.Config, cluster *topology.Cluster, newServersInTopology []int) error { + leader, err := s.Leader(cluster.Name) + if err != nil { + return fmt.Errorf("error get leader: %w", err) + } + + // sanity check for autopilot upgrade + if len(newServersInTopology) < len(cluster.ServerNodes()) { + return fmt.Errorf("insufficient new nodes for autopilot upgrade, expect %d, got %d", + len(cluster.ServerNodes()), len(newServersInTopology)) + } + + for _, nodeIdx := range newServersInTopology { + node := cluster.Nodes[nodeIdx] + if node.Kind != topology.NodeKindServer { + return fmt.Errorf("node %s kind is not server", node.Name) + } + + if !node.Disabled { + return fmt.Errorf("node %s is already enabled", node.Name) + } + + node.Disabled = false + node.IsNewServer = true + + s.logger.Info("Joining new server", "node", node.Name) + } + + err = s.RelaunchWithPhase(cfg, LaunchPhaseUpgrade) + if err != nil { + return fmt.Errorf("error relaunch for upgrade: %w", err) + } + s.logger.Info("Relaunch completed for autopilot upgrade") + + // Verify leader is transferred - if upgrade type is autopilot + s.logger.Info("Waiting for leader transfer") + time.Sleep(20 * time.Second) + err = retry.Do( + func() error { + newLeader, err := s.Leader(cluster.Name) + if err != nil { + return fmt.Errorf("error get new leader: %w", err) + } + s.logger.Info("New leader", "addr", newLeader) + + if newLeader.Name == leader.Name { + return fmt.Errorf("waiting for leader transfer") + } + + return nil + }, + retry.MaxDelay(5*time.Second), + retry.Attempts(20), + ) + if err != nil { + return fmt.Errorf("Leader transfer failed: %w", err) + } + + // Nodes joined the cluster, so we can set all new servers to false + for _, node := range cluster.Nodes { + node.IsNewServer = false + } + + return nil } func (s *Sprawl) RelaunchWithPhase( cfg *topology.Config, - phase string, + launchPhase LaunchPhase, ) error { // Copy this BEFORE compiling so we capture the original definition, without denorms. var err error @@ -236,9 +381,7 @@ func (s *Sprawl) RelaunchWithPhase( return err } - if phase != "" { - s.logger = s.launchLogger.Named(phase) - } + s.logger = s.launchLogger.Named(launchPhase.String()) newTopology, err := topology.Recompile(s.logger.Named("recompile"), cfg, s.topology) if err != nil { @@ -250,7 +393,7 @@ func (s *Sprawl) RelaunchWithPhase( s.logger.Debug("compiled replacement topology", "ct", jd(s.topology)) // TODO start := time.Now() - if err := s.relaunch(); err != nil { + if err := s.relaunch(launchPhase); err != nil { return err } s.logger.Info("topology is ready for use", "elapsed", time.Since(start)) @@ -288,6 +431,36 @@ func (s *Sprawl) SnapshotSave(clusterName string) error { return nil } +func (s *Sprawl) GetKV(cluster string, key string, queryOpts *api.QueryOptions) ([]byte, error) { + client := s.clients[cluster] + kvClient := client.KV() + + data, _, err := kvClient.Get(key, queryOpts) + if err != nil { + return nil, fmt.Errorf("error getting key: %w", err) + } + return data.Value, nil +} + +func (s *Sprawl) LoadKVDataToCluster(cluster string, numberOfKeys int, writeOpts *api.WriteOptions) error { + client := s.clients[cluster] + kvClient := client.KV() + + for i := 0; i <= numberOfKeys; i++ { + p := &api.KVPair{ + Key: fmt.Sprintf("key-%d", i), + } + token := make([]byte, 131072) // 128K size of value + rand.Read(token) + p.Value = token + _, err := kvClient.Put(p, writeOpts) + if err != nil { + return fmt.Errorf("error writing kv: %w", err) + } + } + return nil +} + // Leader returns the cluster leader agent, or an error if no leader is // available. func (s *Sprawl) Leader(clusterName string) (*topology.Node, error) { diff --git a/testing/deployer/topology/topology.go b/testing/deployer/topology/topology.go index 10e9add9e9..77fa38ae8f 100644 --- a/testing/deployer/topology/topology.go +++ b/testing/deployer/topology/topology.go @@ -321,7 +321,7 @@ func (c *Cluster) PartitionQueryOptionsList() []*api.QueryOptions { func (c *Cluster) ServerNodes() []*Node { var out []*Node for _, node := range c.SortedNodes() { - if node.Kind != NodeKindServer || node.Disabled { + if node.Kind != NodeKindServer || node.Disabled || node.IsNewServer { continue } out = append(out, node) @@ -507,6 +507,9 @@ type Node struct { // computed at topology compile Index int + // IsNewServer is true if the server joins existing cluster + IsNewServer bool + // generated during network-and-tls TLSCertPrefix string `json:",omitempty"` @@ -517,6 +520,12 @@ type Node struct { // ports) and values initialized to zero until terraform creates the pods // and extracts the exposed port values from output variables. usedPorts map[int]int // keys are from compile / values are from terraform output vars + + // Meta is the node meta added to the node + Meta map[string]string + + // AutopilotConfig of the server agent + AutopilotConfig map[string]string } func (n *Node) DockerName() string {