mirror of
https://github.com/status-im/consul.git
synced 2025-02-13 22:26:35 +00:00
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
402 lines
14 KiB
Go
402 lines
14 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package peering
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"testing"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
"github.com/hashicorp/consul/testing/deployer/topology"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// 1. Setup: put health service instances in each of the 3 clusters and create the PQ in one of them
|
|
// 2. Execute the PQ: Validate that failover count == 0 and that the pq results come from the local cluster
|
|
// 3. Register a failing TTL health check with the agent managing the service instance in the local cluster
|
|
// 4. Execute the PQ: Validate that failover count == 1 and that the pq results come from the first failover target peer
|
|
// 5. Register a failing TTL health check with the agent managing the service instance in the first failover peer
|
|
// 6. Execute the PQ: Validate that failover count == 2 and that the pq results come from the second failover target
|
|
// 7. Delete failing health check from step 5
|
|
// 8. Repeat step 4
|
|
// 9. Delete failing health check from step 3
|
|
// 10. Repeat step 2
|
|
type ac5_2PQFailoverSuite struct {
|
|
clientSID topology.ServiceID
|
|
serverSID topology.ServiceID
|
|
nodeServer topology.NodeID
|
|
}
|
|
|
|
var ac5_2Context = make(map[nodeKey]ac5_2PQFailoverSuite)
|
|
|
|
func TestAC5PreparedQueryFailover(t *testing.T) {
|
|
ct := NewCommonTopo(t)
|
|
s := &ac5_2PQFailoverSuite{}
|
|
s.setup(t, ct)
|
|
ct.Launch(t)
|
|
s.test(t, ct)
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) setup(t *testing.T, ct *commonTopo) {
|
|
s.setupDC(ct, ct.DC1, ct.DC2)
|
|
s.setupDC(ct, ct.DC2, ct.DC1)
|
|
s.setupDC3(ct, ct.DC3, ct.DC1, ct.DC2)
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) setupDC(ct *commonTopo, clu, peerClu *topology.Cluster) {
|
|
// TODO: handle all partitions
|
|
partition := "default"
|
|
peer := LocalPeerName(peerClu, partition)
|
|
|
|
serverSID := topology.ServiceID{
|
|
Name: "ac5-server-http",
|
|
Partition: partition,
|
|
}
|
|
|
|
clientSID := topology.ServiceID{
|
|
Name: "ac5-client-http",
|
|
Partition: partition,
|
|
}
|
|
|
|
client := serviceExt{
|
|
Service: NewFortioServiceWithDefaults(
|
|
clu.Datacenter,
|
|
clientSID,
|
|
func(s *topology.Service) {
|
|
s.EnvoyAdminPort = 0
|
|
s.DisableServiceMesh = true
|
|
},
|
|
),
|
|
Config: &api.ServiceConfigEntry{
|
|
Kind: api.ServiceDefaults,
|
|
Name: clientSID.Name,
|
|
Partition: ConfigEntryPartition(clientSID.Partition),
|
|
Protocol: "http",
|
|
},
|
|
Exports: []api.ServiceConsumer{{Peer: peer}},
|
|
}
|
|
|
|
ct.AddServiceNode(clu, client)
|
|
|
|
server := serviceExt{
|
|
Service: NewFortioServiceWithDefaults(
|
|
clu.Datacenter,
|
|
serverSID,
|
|
nil,
|
|
),
|
|
Exports: []api.ServiceConsumer{{Peer: peer}},
|
|
}
|
|
serverNode := ct.AddServiceNode(clu, server)
|
|
|
|
ac5_2Context[nodeKey{clu.Datacenter, partition}] = ac5_2PQFailoverSuite{
|
|
clientSID: clientSID,
|
|
serverSID: serverSID,
|
|
nodeServer: serverNode.ID(),
|
|
}
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) setupDC3(ct *commonTopo, clu, peer1, peer2 *topology.Cluster) {
|
|
var (
|
|
peers []string
|
|
partition = "default"
|
|
)
|
|
peers = append(peers, LocalPeerName(peer1, partition), LocalPeerName(peer2, partition))
|
|
|
|
serverSID := topology.ServiceID{
|
|
Name: "ac5-server-http",
|
|
Partition: partition,
|
|
}
|
|
|
|
clientSID := topology.ServiceID{
|
|
Name: "ac5-client-http",
|
|
Partition: partition,
|
|
}
|
|
|
|
// disable service mesh for client in DC3
|
|
client := serviceExt{
|
|
Service: NewFortioServiceWithDefaults(
|
|
clu.Datacenter,
|
|
clientSID,
|
|
func(s *topology.Service) {
|
|
s.EnvoyAdminPort = 0
|
|
s.DisableServiceMesh = true
|
|
},
|
|
),
|
|
Config: &api.ServiceConfigEntry{
|
|
Kind: api.ServiceDefaults,
|
|
Name: clientSID.Name,
|
|
Partition: ConfigEntryPartition(clientSID.Partition),
|
|
Protocol: "http",
|
|
},
|
|
Exports: func() []api.ServiceConsumer {
|
|
var consumers []api.ServiceConsumer
|
|
for _, peer := range peers {
|
|
consumers = append(consumers, api.ServiceConsumer{
|
|
Peer: peer,
|
|
})
|
|
}
|
|
return consumers
|
|
}(),
|
|
}
|
|
|
|
ct.AddServiceNode(clu, client)
|
|
|
|
server := serviceExt{
|
|
Service: NewFortioServiceWithDefaults(
|
|
clu.Datacenter,
|
|
serverSID,
|
|
nil,
|
|
),
|
|
Exports: func() []api.ServiceConsumer {
|
|
var consumers []api.ServiceConsumer
|
|
for _, peer := range peers {
|
|
consumers = append(consumers, api.ServiceConsumer{
|
|
Peer: peer,
|
|
})
|
|
}
|
|
return consumers
|
|
}(),
|
|
}
|
|
|
|
serverNode := ct.AddServiceNode(clu, server)
|
|
|
|
ac5_2Context[nodeKey{clu.Datacenter, partition}] = ac5_2PQFailoverSuite{
|
|
clientSID: clientSID,
|
|
serverSID: serverSID,
|
|
nodeServer: serverNode.ID(),
|
|
}
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) createPreparedQuery(t *testing.T, ct *commonTopo, c *api.Client, serviceName, partition string) (*api.PreparedQueryDefinition, *api.PreparedQuery) {
|
|
var (
|
|
peers []string
|
|
err error
|
|
)
|
|
peers = append(peers, LocalPeerName(ct.DC2, partition), LocalPeerName(ct.DC3, partition))
|
|
|
|
def := &api.PreparedQueryDefinition{
|
|
Name: "ac5-prepared-query",
|
|
Service: api.ServiceQuery{
|
|
Service: serviceName,
|
|
Partition: ConfigEntryPartition(partition),
|
|
OnlyPassing: true,
|
|
Failover: api.QueryFailoverOptions{
|
|
Targets: func() []api.QueryFailoverTarget {
|
|
var queryFailoverTargets []api.QueryFailoverTarget
|
|
for _, peer := range peers {
|
|
queryFailoverTargets = append(queryFailoverTargets, api.QueryFailoverTarget{
|
|
Peer: peer,
|
|
})
|
|
}
|
|
return queryFailoverTargets
|
|
}(),
|
|
},
|
|
},
|
|
}
|
|
|
|
query := c.PreparedQuery()
|
|
def.ID, _, err = query.Create(def, nil)
|
|
require.NoError(t, err, "error creating prepared query in cluster")
|
|
|
|
return def, query
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) test(t *testing.T, ct *commonTopo) {
|
|
partition := "default"
|
|
dc1 := ct.Sprawl.Topology().Clusters[ct.DC1.Name]
|
|
dc2 := ct.Sprawl.Topology().Clusters[ct.DC2.Name]
|
|
dc3 := ct.Sprawl.Topology().Clusters[ct.DC3.Name]
|
|
|
|
type testcase struct {
|
|
cluster *topology.Cluster
|
|
peer *topology.Cluster
|
|
targetCluster *topology.Cluster
|
|
}
|
|
tcs := []testcase{
|
|
{
|
|
cluster: dc1,
|
|
peer: dc2,
|
|
targetCluster: dc3,
|
|
},
|
|
}
|
|
for _, tc := range tcs {
|
|
client := ct.APIClientForCluster(t, tc.cluster)
|
|
|
|
t.Run(fmt.Sprintf("%#v", tc), func(t *testing.T) {
|
|
svc := ac5_2Context[nodeKey{tc.cluster.Name, partition}]
|
|
require.NotNil(t, svc.serverSID.Name, "expected service name to not be nil")
|
|
require.NotNil(t, svc.nodeServer, "expected node server to not be nil")
|
|
|
|
assertServiceHealth(t, client, svc.serverSID.Name, 1)
|
|
def, _ := s.createPreparedQuery(t, ct, client, svc.serverSID.Name, partition)
|
|
s.testPreparedQueryZeroFailover(t, client, def, tc.cluster)
|
|
s.testPreparedQuerySingleFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
|
|
s.testPreparedQueryTwoFailovers(t, ct, client, def, tc.cluster, tc.peer, tc.targetCluster, partition)
|
|
|
|
// delete failing health check in peer cluster & validate single failover
|
|
s.testPQSingleFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
|
|
// delete failing health check in cluster & validate zero failover
|
|
s.testPQZeroFailover(t, ct, client, def, tc.cluster, tc.peer, partition)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) testPreparedQueryZeroFailover(t *testing.T, cl *api.Client, def *api.PreparedQueryDefinition, cluster *topology.Cluster) {
|
|
t.Run(fmt.Sprintf("prepared query should not failover %s", cluster.Name), func(t *testing.T) {
|
|
|
|
// Validate prepared query exists in cluster
|
|
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
|
|
require.NoError(t, err)
|
|
require.Len(t, queryDef, 1, "expected 1 prepared query")
|
|
require.Equal(t, 2, len(queryDef[0].Service.Failover.Targets), "expected 2 prepared query failover targets to dc2 and dc3")
|
|
|
|
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
|
|
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
|
|
require.NoError(r, err)
|
|
|
|
// expected outcome should show 0 failover
|
|
require.Equal(r, 0, queryResult.Failovers, "expected 0 prepared query failover")
|
|
require.Equal(r, cluster.Name, queryResult.Nodes[0].Node.Datacenter, "pq results should come from the local cluster")
|
|
})
|
|
})
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) testPreparedQuerySingleFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu *topology.Cluster, partition string) {
|
|
t.Run(fmt.Sprintf("prepared query with single failover %s", cluster.Name), func(t *testing.T) {
|
|
cfg := ct.Sprawl.Config()
|
|
svc := ac5_2Context[nodeKey{cluster.Name, partition}]
|
|
|
|
nodeCfg := DisableNode(t, cfg, cluster.Name, svc.nodeServer)
|
|
require.NoError(t, ct.Sprawl.Relaunch(nodeCfg))
|
|
|
|
// assert server health status
|
|
assertServiceHealth(t, cl, svc.serverSID.Name, 0)
|
|
|
|
// Validate prepared query exists in cluster
|
|
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
|
|
require.NoError(t, err)
|
|
require.Len(t, queryDef, 1, "expected 1 prepared query")
|
|
|
|
pqFailoverTargets := queryDef[0].Service.Failover.Targets
|
|
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
|
|
|
|
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
|
|
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
|
|
require.NoError(r, err)
|
|
|
|
require.Equal(r, 1, queryResult.Failovers, "expected 1 prepared query failover")
|
|
require.Equal(r, peerClu.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from peer clu %s", peerClu.Name))
|
|
require.Equal(r, pqFailoverTargets[0].Peer, queryResult.Nodes[0].Checks[0].PeerName,
|
|
fmt.Sprintf("pq results should come from the first failover target peer %s", pqFailoverTargets[0].Peer))
|
|
})
|
|
})
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) testPreparedQueryTwoFailovers(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu, targetCluster *topology.Cluster, partition string) {
|
|
t.Run(fmt.Sprintf("prepared query with two failovers %s", cluster.Name), func(t *testing.T) {
|
|
cfg := ct.Sprawl.Config()
|
|
|
|
svc := ac5_2Context[nodeKey{peerClu.Name, partition}]
|
|
|
|
cfg = DisableNode(t, cfg, peerClu.Name, svc.nodeServer)
|
|
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
|
|
|
// assert server health status
|
|
assertServiceHealth(t, cl, ac5_2Context[nodeKey{cluster.Name, partition}].serverSID.Name, 0) // cluster: failing
|
|
assertServiceHealth(t, cl, svc.serverSID.Name, 0) // peer cluster: failing
|
|
|
|
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
|
|
require.NoError(t, err)
|
|
require.Len(t, queryDef, 1, "expected 1 prepared query")
|
|
|
|
pqFailoverTargets := queryDef[0].Service.Failover.Targets
|
|
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
|
|
|
|
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
|
|
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
|
|
require.NoError(r, err)
|
|
require.Equal(r, 2, queryResult.Failovers, "expected 2 prepared query failover")
|
|
|
|
require.Equal(r, targetCluster.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from cluster %s", targetCluster.Name))
|
|
require.Equal(r, pqFailoverTargets[1].Peer, queryResult.Nodes[0].Checks[0].PeerName,
|
|
fmt.Sprintf("pq results should come from the second failover target peer %s", pqFailoverTargets[1].Peer))
|
|
})
|
|
})
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) testPQSingleFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu *topology.Cluster, partition string) {
|
|
t.Run(fmt.Sprintf("delete failing health check in %s and validate single failover %s", peerClu.Name, cluster.Name), func(t *testing.T) {
|
|
cfg := ct.Sprawl.Config()
|
|
|
|
svc := ac5_2Context[nodeKey{peerClu.Name, partition}]
|
|
|
|
cfg = EnableNode(t, cfg, peerClu.Name, svc.nodeServer)
|
|
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
|
|
|
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
|
|
require.NoError(t, err)
|
|
|
|
pqFailoverTargets := queryDef[0].Service.Failover.Targets
|
|
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
|
|
|
|
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
|
|
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
|
|
require.NoError(r, err)
|
|
require.Equal(r, 1, queryResult.Failovers, "expected 1 prepared query failover")
|
|
|
|
require.Equal(r, peerClu.Name, queryResult.Nodes[0].Node.Datacenter, fmt.Sprintf("the pq results should originate from cluster %s", peerClu.Name))
|
|
require.Equal(r, pqFailoverTargets[0].Peer, queryResult.Nodes[0].Checks[0].PeerName,
|
|
fmt.Sprintf("pq results should come from the second failover target peer %s", pqFailoverTargets[0].Peer))
|
|
})
|
|
})
|
|
}
|
|
|
|
func (s *ac5_2PQFailoverSuite) testPQZeroFailover(t *testing.T, ct *commonTopo, cl *api.Client, def *api.PreparedQueryDefinition, cluster, peerClu *topology.Cluster, partition string) {
|
|
t.Run(fmt.Sprintf("delete failing health check in %s and validate zero failover %s", cluster.Name, cluster.Name), func(t *testing.T) {
|
|
cfg := ct.Sprawl.Config()
|
|
|
|
svc := ac5_2Context[nodeKey{cluster.Name, partition}]
|
|
|
|
cfg = EnableNode(t, cfg, cluster.Name, svc.nodeServer)
|
|
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
|
|
|
// assert server health status
|
|
assertServiceHealth(t, cl, ac5_2Context[nodeKey{cluster.Name, partition}].serverSID.Name, 1) // cluster: passing
|
|
assertServiceHealth(t, cl, svc.serverSID.Name, 1) // peer cluster: passing
|
|
|
|
queryDef, _, err := cl.PreparedQuery().Get(def.ID, nil)
|
|
require.NoError(t, err)
|
|
|
|
pqFailoverTargets := queryDef[0].Service.Failover.Targets
|
|
require.Len(t, pqFailoverTargets, 2, "expected 2 prepared query failover targets to dc2 and dc3")
|
|
|
|
retry.RunWith(&retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
|
|
queryResult, _, err := cl.PreparedQuery().Execute(def.ID, nil)
|
|
require.NoError(r, err)
|
|
// expected outcome should show 0 failover
|
|
require.Equal(r, 0, queryResult.Failovers, "expected 0 prepared query failover")
|
|
require.Equal(r, cluster.Name, queryResult.Nodes[0].Node.Datacenter, "pq results should come from the local cluster")
|
|
})
|
|
})
|
|
}
|
|
|
|
// assertServiceHealth checks that a service health status before running tests
|
|
func assertServiceHealth(t *testing.T, cl *api.Client, serverSVC string, count int) {
|
|
t.Helper()
|
|
t.Log("validate service health in catalog")
|
|
retry.RunWith(&retry.Timer{Timeout: time.Second * 20, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
|
|
svcs, _, err := cl.Health().Service(
|
|
serverSVC,
|
|
"",
|
|
true,
|
|
nil,
|
|
)
|
|
require.NoError(r, err)
|
|
require.Equal(r, count, len(svcs))
|
|
})
|
|
}
|