consul/test-integ/peering_commontopo/ac5_2_pq_failover_test.go

408 lines
15 KiB
Go
Raw Normal View History

[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package peering
import (
"fmt"
"time"
"testing"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/stretchr/testify/require"
)
// 1. Setup: put health service instances in each of the 3 clusters and create the PQ in one of them
// 2. Execute the PQ: Validate that failover count == 0 and that the pq results come from the local cluster
// 3. Register a failing TTL health check with the agent managing the service instance in the local cluster
// 4. Execute the PQ: Validate that failover count == 1 and that the pq results come from the first failover target peer
// 5. Register a failing TTL health check with the agent managing the service instance in the first failover peer
// 6. Execute the PQ: Validate that failover count == 2 and that the pq results come from the second failover target
// 7. Delete failing health check from step 5
// 8. Repeat step 4
// 9. Delete failing health check from step 3
// 10. Repeat step 2
type ac5_2PQFailoverSuite struct {
clientSID topology.ServiceID
serverSID topology.ServiceID
nodeServer topology.NodeID
}
var ac5_2Context = make(map[nodeKey]ac5_2PQFailoverSuite)
func TestAC5PreparedQueryFailover(t *testing.T) {
ct := NewCommonTopo(t)
s := &ac5_2PQFailoverSuite{}
s.setup(t, ct)
ct.Launch(t)
s.test(t, ct)
}
func (s *ac5_2PQFailoverSuite) setup(t *testing.T, ct *commonTopo) {
s.setupDC(ct, ct.DC1, ct.DC2)
s.setupDC(ct, ct.DC2, ct.DC1)
s.setupDC3(ct, ct.DC3, ct.DC1, ct.DC2)
}
func (s *ac5_2PQFailoverSuite) setupDC(ct *commonTopo, clu, peerClu *topology.Cluster) {
// TODO: handle all partitions
partition := "default"
peer := LocalPeerName(peerClu, partition)
serverSID := topology.ServiceID{
Name: "ac5-server-http",
Partition: partition,
}
clientSID := topology.ServiceID{
Name: "ac5-client-http",
Partition: partition,
}
client := serviceExt{
Service: NewFortioServiceWithDefaults(
clu.Datacenter,
clientSID,
func(s *topology.Service) {
s.EnvoyAdminPort = 0
s.DisableServiceMesh = true
},
),
Config: &api.ServiceConfigEntry{
Kind: api.ServiceDefaults,
Name: clientSID.Name,
Partition: ConfigEntryPartition(clientSID.Partition),
Protocol: "http",
},
Exports: []api.ServiceConsumer{{Peer: peer}},
}
ct.AddServiceNode(clu, client)
server := serviceExt{
Service: NewFortioServiceWithDefaults(
clu.Datacenter,
serverSID,
func(s *topology.Service) {
s.EnvoyAdminPort = 0
s.DisableServiceMesh = true
},
),
Exports: []api.ServiceConsumer{{Peer: peer}},
}
serverNode := ct.AddServiceNode(clu, server)
ac5_2Context[nodeKey{clu.Datacenter, partition}] = ac5_2PQFailoverSuite{
clientSID: clientSID,
serverSID: serverSID,
nodeServer: serverNode.ID(),
}
}
func (s *ac5_2PQFailoverSuite) setupDC3(ct *commonTopo, clu, peer1, peer2 *topology.Cluster) {
var (
peers []string
partition = "default"
)
peers = append(peers, LocalPeerName(peer1, partition), LocalPeerName(peer2, partition))
serverSID := topology.ServiceID{
Name: "ac5-server-http",
Partition: partition,
}
clientSID := topology.ServiceID{
Name: "ac5-client-http",
Partition: partition,
}
// disable service mesh for client in DC3
client := serviceExt{
Service: NewFortioServiceWithDefaults(
clu.Datacenter,
clientSID,
func(s *topology.Service) {
s.EnvoyAdminPort = 0
s.DisableServiceMesh = true
},
),
Config: &api.ServiceConfigEntry{
Kind: api.ServiceDefaults,
Name: clientSID.Name,
Partition: ConfigEntryPartition(clientSID.Partition),
Protocol: "http",
},
Exports: func() []api.ServiceConsumer {
var consumers []api.ServiceConsumer
for _, peer := range peers {
consumers = append(consumers, api.ServiceConsumer{
Peer: peer,
})
}
return consumers
}(),
}
ct.AddServiceNode(clu, client)
server := serviceExt{
Service: NewFortioServiceWithDefaults(
clu.Datacenter,
serverSID,
func(s *topology.Service) {
s.EnvoyAdminPort = 0
s.DisableServiceMesh = true
},
),
Exports: func() []api.ServiceConsumer {
var consumers []api.ServiceConsumer
for _, peer := range peers {
consumers = append(consumers, api.ServiceConsumer{
Peer: peer,
})
}
return consumers
}(),
}
serverNode := ct.AddServiceNode(clu, server)
ac5_2Context[nodeKey{clu.Datacenter, partition}] = ac5_2PQFailoverSuite{
clientSID: clientSID,
serverSID: serverSID,
nodeServer: serverNode.ID(),
}
}
func (s *ac5_2PQFailoverSuite) createPreparedQuery(t *testing.T, ct *commonTopo, c *api.Client, serviceName, partition string) (*api.PreparedQueryDefinition, *api.PreparedQuery) {
var (
peers []string
err error
)
peers = append(peers, LocalPeerName(ct.DC2, partition), LocalPeerName(ct.DC3, partition))
def := &api.PreparedQueryDefinition{
Name: "ac5-prepared-query",
Service: api.ServiceQuery{
Service: serviceName,
Partition: ConfigEntryPartition(partition),
OnlyPassing: true,
Failover: api.QueryFailoverOptions{
Targets: func() []api.QueryFailoverTarget {
var queryFailoverTargets []api.QueryFailoverTarget
for _, peer := range peers {
queryFailoverTargets = append(queryFailoverTargets, api.QueryFailoverTarget{
Peer: peer,
})
}
return queryFailoverTargets
}(),
},
},
}
query := c.PreparedQuery()
def.ID, _, err = query.Create(def, nil)
require.NoError(t, err, "error creating prepared query in cluster")
return def, query
}
func (s *ac5_2PQFailoverSuite) test(t *testing.T, ct *commonTopo) {
partition := "default"
dc1 := ct.Sprawl.Topology().Clusters[ct.DC1.Name]
dc2 := ct.Sprawl.Topology().Clusters[ct.DC2.Name]
dc3 := ct.Sprawl.Topology().Clusters[ct.DC3.Name]
type testcase struct {
cluster *topology.Cluster
peer *topology.Cluster
targetCluster *topology.Cluster
}
tcs := []testcase{
{
cluster: dc1,
peer: dc2,
targetCluster: dc3,
},
}
for _, tc := range tcs {
client := ct.APIClientForCluster(t, tc.cluster)
t.Run(fmt.Sprintf("%#v", tc), func(t *testing.T) {
svc := ac5_2Context[nodeKey{tc.cluster.Name, partition}]
require.NotNil(t, svc.serverSID.Name, "expected service name to not be nil")
require.NotNil(t, svc.nodeServer, "expected node server to not be nil")
assertServiceHealth(t, client, svc.serverSID.Name, 1)
def, _ := s.createPreparedQuery(t, ct, client, svc.serverSID.Name, partition)
s.testPreparedQueryZeroFailover(t, client, def, tc.cluster)
s.testPreparedQuerySingleFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
s.testPreparedQueryTwoFailovers(t, ct, client, def, tc.cluster, tc.peer, tc.targetCluster, partition)
// delete failing health check in peer cluster & validate single failover
s.testPQSingleFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
// delete failing health check in cluster & validate zero failover
s.testPQZeroFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
})
}
}
func (s *ac5_2PQFailoverSuite) testPreparedQueryZeroFailover(t *testing.T, cl *api.Client, def *api.PreparedQueryDefinition, cluster *topology.Cluster) {
t.Run(fmt.Sprintf("prepared query should not failover %s", cluster.Name), func(t *testing.T) {
// Validate prepared query exists in cluster
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
require.NoError(t, err)
require.Len(t, queryDef, 1, "expected 1 prepared query")
require.Equal(t, 2, len(queryDef[0].Service.Failover.Targets), "expected 2 prepared query failover targets to dc2 and dc3")
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
require.NoError(r, err)
// expected outcome should show 0 failover
require.Equal(r, 0, queryResult.Failovers, "expected 0 prepared query failover")
require.Equal(r, cluster.Name, queryResult.Nodes[0].Node.Datacenter, "pq results should come from the local cluster")
})
})
}
func (s *ac5_2PQFailoverSuite) testPreparedQuerySingleFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu *topology.Cluster, partition string) {
t.Run(fmt.Sprintf("prepared query with single failover %s", cluster.Name), func(t *testing.T) {
cfg := ct.Sprawl.Config()
svc := ac5_2Context[nodeKey{cluster.Name, partition}]
nodeCfg := DisableNode(t, cfg, cluster.Name, svc.nodeServer)
require.NoError(t, ct.Sprawl.Relaunch(nodeCfg))
// assert server health status
assertServiceHealth(t, cl, svc.serverSID.Name, 0)
// Validate prepared query exists in cluster
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
require.NoError(t, err)
require.Len(t, queryDef, 1, "expected 1 prepared query")
pqFailoverTargets := queryDef[0].Service.Failover.Targets
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
require.NoError(r, err)
require.Equal(r, 1, queryResult.Failovers, "expected 1 prepared query failover")
require.Equal(r, peerClu.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from peer clu %s", peerClu.Name))
require.Equal(r, pqFailoverTargets[0].Peer, queryResult.Nodes[0].Checks[0].PeerName,
fmt.Sprintf("pq results should come from the first failover target peer %s", pqFailoverTargets[0].Peer))
})
})
}
func (s *ac5_2PQFailoverSuite) testPreparedQueryTwoFailovers(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu, targetCluster *topology.Cluster, partition string) {
t.Run(fmt.Sprintf("prepared query with two failovers %s", cluster.Name), func(t *testing.T) {
cfg := ct.Sprawl.Config()
svc := ac5_2Context[nodeKey{peerClu.Name, partition}]
cfg = DisableNode(t, cfg, peerClu.Name, svc.nodeServer)
require.NoError(t, ct.Sprawl.Relaunch(cfg))
// assert server health status
assertServiceHealth(t, cl, ac5_2Context[nodeKey{cluster.Name, partition}].serverSID.Name, 0) // cluster: failing
assertServiceHealth(t, cl, svc.serverSID.Name, 0) // peer cluster: failing
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
require.NoError(t, err)
require.Len(t, queryDef, 1, "expected 1 prepared query")
pqFailoverTargets := queryDef[0].Service.Failover.Targets
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
require.NoError(r, err)
require.Equal(r, 2, queryResult.Failovers, "expected 2 prepared query failover")
require.Equal(r, targetCluster.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from cluster %s", targetCluster.Name))
require.Equal(r, pqFailoverTargets[1].Peer, queryResult.Nodes[0].Checks[0].PeerName,
fmt.Sprintf("pq results should come from the second failover target peer %s", pqFailoverTargets[1].Peer))
})
})
}
func (s *ac5_2PQFailoverSuite) testPQSingleFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu *topology.Cluster, partition string) {
t.Run(fmt.Sprintf("delete failing health check in %s and validate single failover %s", peerClu.Name, cluster.Name), func(t *testing.T) {
cfg := ct.Sprawl.Config()
svc := ac5_2Context[nodeKey{peerClu.Name, partition}]
cfg = EnableNode(t, cfg, peerClu.Name, svc.nodeServer)
require.NoError(t, ct.Sprawl.Relaunch(cfg))
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
require.NoError(t, err)
pqFailoverTargets := queryDef[0].Service.Failover.Targets
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
require.NoError(r, err)
require.Equal(r, 1, queryResult.Failovers, "expected 1 prepared query failover")
require.Equal(r, peerClu.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from cluster %s", peerClu.Name))
require.Equal(r, pqFailoverTargets[0].Peer, queryResult.Nodes[0].Checks[0].PeerName,
fmt.Sprintf("pq results should come from the second failover target peer %s", pqFailoverTargets[0].Peer))
})
})
}
func (s *ac5_2PQFailoverSuite) testPQZeroFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, _ *topology.Cluster, partition string) {
t.Run(fmt.Sprintf("delete failing health check in %s and validate zero failover %s", cluster.Name, cluster.Name), func(t *testing.T) {
cfg := ct.Sprawl.Config()
svc := ac5_2Context[nodeKey{cluster.Name, partition}]
cfg = EnableNode(t, cfg, cluster.Name, svc.nodeServer)
require.NoError(t, ct.Sprawl.Relaunch(cfg))
// assert server health status
assertServiceHealth(t, cl, ac5_2Context[nodeKey{cluster.Name, partition}].serverSID.Name, 1) // cluster: passing
assertServiceHealth(t, cl, svc.serverSID.Name, 1) // peer cluster: passing
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
require.NoError(t, err)
pqFailoverTargets := queryDef[0].Service.Failover.Targets
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
require.NoError(r, err)
// expected outcome should show 0 failover
require.Equal(r, 0, queryResult.Failovers, "expected 0 prepared query failover")
require.Equal(r, cluster.Name, queryResult.Nodes[0].Node.Datacenter, "pq results should come from the local cluster")
})
})
}
// assertServiceHealth checks that a service health status before running tests
func assertServiceHealth(t *testing.T, cl *api.Client, serverSVC string, count int) {
t.Helper()
t.Log("validate service health in catalog")
retry.RunWith(&retry.Timer{Timeout: time.Second * 20, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
svcs, _, err := cl.Health().Service(
serverSVC,
"",
true,
nil,
)
require.NoError(r, err)
require.Equal(r, count, len(svcs))
})
}