mirror of https://github.com/status-im/consul.git
chore: apply enterprise changes that were missed to some testing files (#19504)
This should align between CE ef35525
and ENT 7f95226dbe40151c8f17dd4464784b60cf358dc1 in:
- testing/integration/consul-container
- test-integ
- testing/deployer
This commit is contained in:
parent
fd128f4947
commit
65592d91a8
|
@ -232,8 +232,6 @@ func (s *ac1BasicSuite) test(t *testing.T, ct *commonTopo) {
|
|||
// probably not worth the speed boost
|
||||
ct.Assert.HealthyWithPeer(t, dc.Name, svcServerHTTP.ID, LocalPeerName(peer, "default"))
|
||||
ct.Assert.HealthyWithPeer(t, dc.Name, svcServerTCP.ID, LocalPeerName(peer, "default"))
|
||||
ct.Assert.UpstreamEndpointHealthy(t, svcClientTCP, ac.upstreamTCP)
|
||||
ct.Assert.UpstreamEndpointHealthy(t, svcClientTCP, ac.upstreamHTTP)
|
||||
|
||||
tcs := []struct {
|
||||
acSub int
|
||||
|
|
|
@ -172,7 +172,6 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) {
|
|||
// these could be done parallel with each other, but complexity
|
||||
// probably not worth the speed boost
|
||||
ct.Assert.HealthyWithPeer(t, dc.Name, svcServer.ID, LocalPeerName(peer, "default"))
|
||||
ct.Assert.UpstreamEndpointHealthy(t, svcClient, s.upstream)
|
||||
// TODO: we need to let the upstream start serving properly before we do this. if it
|
||||
// isn't ready and returns a 5xx (which it will do if it's not up yet!), it will stick
|
||||
// in a down state for PassiveHealthCheck.Interval
|
||||
|
|
|
@ -159,7 +159,6 @@ func (s *ac4ProxyDefaultsSuite) test(t *testing.T, ct *commonTopo) {
|
|||
|
||||
// preconditions check
|
||||
ct.Assert.HealthyWithPeer(t, dc.Name, serverSVC.ID, LocalPeerName(peer, "default"))
|
||||
ct.Assert.UpstreamEndpointHealthy(t, clientSVC, s.upstream)
|
||||
ct.Assert.FortioFetch2HeaderEcho(t, clientSVC, s.upstream)
|
||||
|
||||
t.Run("Validate services exist in catalog", func(t *testing.T) {
|
||||
|
|
|
@ -30,11 +30,15 @@ type ac5_2PQFailoverSuite struct {
|
|||
serverSID topology.ServiceID
|
||||
nodeServer topology.NodeID
|
||||
}
|
||||
type nodeKey struct {
|
||||
dc string
|
||||
partition string
|
||||
}
|
||||
|
||||
var ac5_2Context = make(map[nodeKey]ac5_2PQFailoverSuite)
|
||||
|
||||
func TestAC5PreparedQueryFailover(t *testing.T) {
|
||||
ct := NewCommonTopo(t)
|
||||
ct := newCommonTopo(t, "dc2", true, true)
|
||||
s := &ac5_2PQFailoverSuite{}
|
||||
s.setup(t, ct)
|
||||
ct.Launch(t)
|
||||
|
|
|
@ -14,140 +14,341 @@ import (
|
|||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
)
|
||||
|
||||
// note: unlike other *Suite structs that are per-peering direction,
|
||||
// this one is special and does all directions itself, because the
|
||||
// setup is not exactly symmetrical
|
||||
type ac6FailoversSuite struct {
|
||||
ac6 map[nodeKey]ac6FailoversContext
|
||||
}
|
||||
type ac6FailoversContext struct {
|
||||
clientSID topology.ServiceID
|
||||
serverSID topology.ServiceID
|
||||
// inputs
|
||||
// with all false, this gives us a scenario with:
|
||||
// - a "near" server in the accepter cluster (DC1), partitition default, namespace default
|
||||
// - a "far" server in the dialer cluster (DC2), partition default, namespace default
|
||||
// - a client in the accepter cluster (DC1), partition default, namespace default, with:
|
||||
// - upstream near server (DC1)
|
||||
// - failover to far server (DC2)
|
||||
//
|
||||
// TODO: technically if NearInDial && !FarInAcc (i.e., near == far), then we're not doing peering at all,
|
||||
// and could do this test in a single DC
|
||||
|
||||
// when true, put the client (and its default upstream server) in the dialer peer; otherwise, put client in accepter
|
||||
NearInDial bool
|
||||
// when true, put the client (and its default upstream server) in the nondefault partition/namespace; otherwise in the default
|
||||
NearInPartAlt bool
|
||||
NearInNSAlt bool
|
||||
// when true, put far server to the accepter peer; otherwise the dialer
|
||||
FarInAcc bool
|
||||
// when true, put far server to nondefault partition/namespace (ENT-only); otherwise, failover to default
|
||||
FarInPartAlt bool
|
||||
FarInNSAlt bool
|
||||
|
||||
// launch outputs, for querying during test
|
||||
clientSID topology.ServiceID
|
||||
// near = same DC as client; far = other DC
|
||||
nearServerSID topology.ServiceID
|
||||
// used to remove the node and trigger failover
|
||||
serverNode topology.NodeID
|
||||
}
|
||||
type nodeKey struct {
|
||||
dc string
|
||||
partition string
|
||||
nearServerNode topology.NodeID
|
||||
farServerSID topology.ServiceID
|
||||
farServerNode topology.NodeID
|
||||
}
|
||||
|
||||
// Note: this test cannot share topo
|
||||
func TestAC6Failovers(t *testing.T) {
|
||||
// bit banging to get all permutations of all params
|
||||
const nParams = 3
|
||||
// i.e 2**nParams
|
||||
const n = int(1) << nParams
|
||||
for i := 0; i < n; i++ {
|
||||
s := ac6FailoversSuite{
|
||||
// xth bit == 1
|
||||
NearInDial: (i>>0)&1 == 1,
|
||||
NearInPartAlt: (i>>1)&1 == 1,
|
||||
FarInPartAlt: (i>>2)&1 == 1,
|
||||
}
|
||||
// ensure the servers are always in separate DCs
|
||||
s.FarInAcc = s.NearInDial
|
||||
t.Run(fmt.Sprintf("%02d_%s", i, s.testName()), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ct := NewCommonTopo(t)
|
||||
s := &ac6FailoversSuite{}
|
||||
s.setup(t, ct)
|
||||
ct.Launch(t)
|
||||
s.test(t, ct)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNET5029Failovers(t *testing.T) {
|
||||
// TODO: *.{a,b} are not actually peering tests, and should technically be moved elsewhere
|
||||
suites := map[string]ac6FailoversSuite{
|
||||
"1.a": {
|
||||
FarInAcc: true,
|
||||
FarInPartAlt: true,
|
||||
},
|
||||
"1.b": {
|
||||
FarInAcc: true,
|
||||
FarInNSAlt: true,
|
||||
},
|
||||
"1.c": {
|
||||
FarInNSAlt: true,
|
||||
},
|
||||
"1.d": {
|
||||
FarInPartAlt: true,
|
||||
},
|
||||
"2.a": {
|
||||
FarInAcc: true,
|
||||
NearInPartAlt: true,
|
||||
},
|
||||
"2.b": {
|
||||
FarInAcc: true,
|
||||
NearInNSAlt: true,
|
||||
},
|
||||
"2.c": {
|
||||
NearInDial: true,
|
||||
NearInNSAlt: true,
|
||||
FarInAcc: true,
|
||||
},
|
||||
"2.d": {
|
||||
NearInDial: true,
|
||||
NearInPartAlt: true,
|
||||
FarInAcc: true,
|
||||
},
|
||||
}
|
||||
for name, s := range suites {
|
||||
s := s
|
||||
t.Run(fmt.Sprintf("%s_%s", name, s.testName()), func(t *testing.T) {
|
||||
if name == "1.b" {
|
||||
t.Skip("TODO: fails with 503/504")
|
||||
}
|
||||
t.Parallel()
|
||||
ct := NewCommonTopo(t)
|
||||
s.setup(t, ct)
|
||||
ct.Launch(t)
|
||||
s.test(t, ct)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAC6Failovers_AllPermutations(t *testing.T) {
|
||||
//
|
||||
t.Skip("Too many permutations")
|
||||
// bit banging to get all permutations of all params
|
||||
const nParams = 6
|
||||
// i.e 2**nParams
|
||||
const n = int(1) << nParams
|
||||
for i := 0; i < n; i++ {
|
||||
s := ac6FailoversSuite{
|
||||
// xth bit == 1
|
||||
NearInDial: (i>>0)&1 == 1,
|
||||
FarInAcc: (i>>1)&1 == 1,
|
||||
NearInPartAlt: (i>>2)&1 == 1,
|
||||
FarInPartAlt: (i>>3)&1 == 1,
|
||||
NearInNSAlt: (i>>4)&1 == 1,
|
||||
FarInNSAlt: (i>>5)&1 == 1,
|
||||
}
|
||||
t.Run(fmt.Sprintf("%02d_%s", i, s.testName()), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ct := NewCommonTopo(t)
|
||||
s.setup(t, ct)
|
||||
ct.Launch(t)
|
||||
s.test(t, ct)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ac6FailoversSuite) testName() (ret string) {
|
||||
switch s.NearInDial {
|
||||
case true:
|
||||
ret += "dial"
|
||||
default:
|
||||
ret += "acc"
|
||||
}
|
||||
ret += "."
|
||||
switch s.NearInPartAlt {
|
||||
case true:
|
||||
ret += "alt"
|
||||
default:
|
||||
ret += "default"
|
||||
}
|
||||
ret += "."
|
||||
switch s.NearInNSAlt {
|
||||
case true:
|
||||
ret += "alt"
|
||||
default:
|
||||
ret += "default"
|
||||
}
|
||||
|
||||
ret += "->"
|
||||
|
||||
switch s.FarInAcc {
|
||||
case true:
|
||||
ret += "acc"
|
||||
default:
|
||||
ret += "dial"
|
||||
}
|
||||
ret += "."
|
||||
switch s.FarInPartAlt {
|
||||
case true:
|
||||
ret += "alt"
|
||||
default:
|
||||
ret += "default"
|
||||
}
|
||||
ret += "."
|
||||
switch s.FarInNSAlt {
|
||||
case true:
|
||||
ret += "alt"
|
||||
default:
|
||||
ret += "default"
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *ac6FailoversSuite) setup(t *testing.T, ct *commonTopo) {
|
||||
// TODO: update setups to loop through a cluster's partitions+namespaces internally
|
||||
s.setupAC6Failovers(ct, ct.DC1, ct.DC2)
|
||||
s.setupAC6Failovers(ct, ct.DC2, ct.DC1)
|
||||
s.setupAC6FailoversDC3(ct, ct.DC3, ct.DC1, ct.DC2)
|
||||
if !utils.IsEnterprise() && (s.NearInPartAlt || s.FarInPartAlt) {
|
||||
t.Skip("ENT required for nondefault partitions")
|
||||
}
|
||||
|
||||
// dc1 is peered with dc2 and dc3.
|
||||
// dc1 has an ac6-client in "default" and "part1" partitions (only default in CE).
|
||||
// ac6-client has a single upstream ac6-failover-svc in its respective partition^.
|
||||
//
|
||||
// ac6-failover-svc has the following failovers:
|
||||
// - peer-dc2-default
|
||||
// - peer-dc2-part1 (not in CE)
|
||||
// - peer-dc3-default
|
||||
//
|
||||
// This setup is mirrored from dc2->dc1 as well
|
||||
// (both dcs have dc3 as the last failover target)
|
||||
//
|
||||
// ^NOTE: There are no cross-partition upstreams because MeshGatewayMode = local
|
||||
// and failover information gets stripped out by the mesh gateways so we
|
||||
// can't test failovers.
|
||||
func (s *ac6FailoversSuite) setupAC6Failovers(ct *commonTopo, clu, peerClu *topology.Cluster) {
|
||||
for _, part := range clu.Partitions {
|
||||
partition := part.Name
|
||||
|
||||
// There is a peering per partition in the peered cluster
|
||||
var peers []string
|
||||
for _, peerPart := range peerClu.Partitions {
|
||||
peers = append(peers, LocalPeerName(peerClu, peerPart.Name))
|
||||
nearClu := ct.DC1
|
||||
farClu := ct.DC2
|
||||
if s.NearInDial {
|
||||
nearClu = ct.DC2
|
||||
}
|
||||
if s.FarInAcc {
|
||||
farClu = ct.DC1
|
||||
}
|
||||
|
||||
// Make an HTTP server with various failover targets
|
||||
serverSID := topology.ServiceID{
|
||||
Name: "ac6-failover-svc",
|
||||
Partition: partition,
|
||||
// - server in clientPartition/DC (main target)
|
||||
nearServerSID := topology.ServiceID{
|
||||
Name: "ac6-server",
|
||||
Partition: ConfigEntryPartition("default"),
|
||||
Namespace: "default",
|
||||
}
|
||||
server := NewFortioServiceWithDefaults(
|
||||
clu.Datacenter,
|
||||
serverSID,
|
||||
if s.NearInPartAlt {
|
||||
nearServerSID.Partition = "part1"
|
||||
}
|
||||
if s.NearInNSAlt {
|
||||
nearServerSID.Namespace = "ns1"
|
||||
}
|
||||
nearServer := NewFortioServiceWithDefaults(
|
||||
nearClu.Datacenter,
|
||||
nearServerSID,
|
||||
nil,
|
||||
)
|
||||
// Export to all known peers
|
||||
ct.ExportService(clu, partition,
|
||||
api.ExportedService{
|
||||
Name: server.ID.Name,
|
||||
Consumers: func() []api.ServiceConsumer {
|
||||
var consumers []api.ServiceConsumer
|
||||
for _, peer := range peers {
|
||||
consumers = append(consumers, api.ServiceConsumer{
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
return consumers
|
||||
}(),
|
||||
},
|
||||
)
|
||||
serverNode := ct.AddServiceNode(clu, serviceExt{Service: server})
|
||||
nearServerNode := ct.AddServiceNode(nearClu, serviceExt{Service: nearServer})
|
||||
|
||||
clu.InitialConfigEntries = append(clu.InitialConfigEntries,
|
||||
nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries,
|
||||
&api.ServiceConfigEntry{
|
||||
Kind: api.ServiceDefaults,
|
||||
Name: server.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
Name: nearServerSID.Name,
|
||||
Partition: ConfigEntryPartition(nearServerSID.Partition),
|
||||
Namespace: nearServerSID.Namespace,
|
||||
Protocol: "http",
|
||||
},
|
||||
)
|
||||
// - server in otherPartition/otherDC
|
||||
farServerSID := topology.ServiceID{
|
||||
Name: nearServerSID.Name,
|
||||
Partition: "default",
|
||||
Namespace: "default",
|
||||
}
|
||||
if s.FarInPartAlt {
|
||||
farServerSID.Partition = "part1"
|
||||
}
|
||||
if s.FarInNSAlt {
|
||||
farServerSID.Namespace = "ns1"
|
||||
}
|
||||
farServer := NewFortioServiceWithDefaults(
|
||||
farClu.Datacenter,
|
||||
farServerSID,
|
||||
nil,
|
||||
)
|
||||
farServerNode := ct.AddServiceNode(farClu, serviceExt{Service: farServer})
|
||||
if nearClu != farClu {
|
||||
ct.ExportService(farClu, farServerSID.Partition,
|
||||
api.ExportedService{
|
||||
Name: farServerSID.Name,
|
||||
Namespace: farServerSID.Namespace,
|
||||
Consumers: []api.ServiceConsumer{
|
||||
{
|
||||
Peer: LocalPeerName(nearClu, nearServerSID.Partition),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
} else if nearClu == farClu && farServerSID.Partition != nearServerSID.Partition {
|
||||
ct.ExportService(farClu, farServerSID.Partition,
|
||||
api.ExportedService{
|
||||
Name: farServerSID.Name,
|
||||
Namespace: farServerSID.Namespace,
|
||||
Consumers: []api.ServiceConsumer{
|
||||
{
|
||||
// this must not be "", or else it is basically ignored altogether
|
||||
// TODO: bug? if this whole struct is empty, that should be an error
|
||||
Partition: topology.PartitionOrDefault(nearServerSID.Partition),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
var targets []api.ServiceResolverFailoverTarget
|
||||
if nearClu != farClu {
|
||||
targets = []api.ServiceResolverFailoverTarget{
|
||||
{
|
||||
Service: farServerSID.Name,
|
||||
Peer: LocalPeerName(farClu, farServerSID.Partition),
|
||||
Namespace: farServerSID.Namespace,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
part := ConfigEntryPartition(farServerSID.Partition)
|
||||
// weird exception here where target partition set to "" means "inherit from parent"
|
||||
// TODO: bug? docs say "" -> default:
|
||||
// https://developer.hashicorp.com/consul/docs/connect/config-entries/service-resolver#failover-targets-partition
|
||||
if farServerSID.Partition == "default" && nearServerSID.Partition != "default" {
|
||||
part = "default"
|
||||
}
|
||||
targets = []api.ServiceResolverFailoverTarget{
|
||||
{
|
||||
Service: farServerSID.Name,
|
||||
Partition: part,
|
||||
Namespace: farServerSID.Namespace,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries,
|
||||
&api.ServiceConfigEntry{
|
||||
Kind: api.ServiceDefaults,
|
||||
Name: farServerSID.Name,
|
||||
Partition: ConfigEntryPartition(farServerSID.Partition),
|
||||
Namespace: farServerSID.Namespace,
|
||||
Protocol: "http",
|
||||
},
|
||||
&api.ServiceResolverConfigEntry{
|
||||
Kind: api.ServiceResolver,
|
||||
Name: server.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
Name: nearServerSID.Name,
|
||||
Partition: ConfigEntryPartition(nearServerSID.Partition),
|
||||
Namespace: nearServerSID.Namespace,
|
||||
Failover: map[string]api.ServiceResolverFailover{
|
||||
"*": {
|
||||
Targets: func() []api.ServiceResolverFailoverTarget {
|
||||
// Make a failover target for every partition in the peer cluster
|
||||
var targets []api.ServiceResolverFailoverTarget
|
||||
for _, peer := range peers {
|
||||
targets = append(targets, api.ServiceResolverFailoverTarget{
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
// Just hard code default partition for dc3, since the exhaustive
|
||||
// testing will be done against dc2.
|
||||
targets = append(targets, api.ServiceResolverFailoverTarget{
|
||||
Peer: "peer-dc3-default",
|
||||
})
|
||||
return targets
|
||||
}(),
|
||||
Targets: targets,
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Make client which will dial server
|
||||
clientSID := topology.ServiceID{
|
||||
Name: "ac6-client",
|
||||
Partition: partition,
|
||||
Partition: nearServerSID.Partition,
|
||||
Namespace: nearServerSID.Namespace,
|
||||
}
|
||||
client := NewFortioServiceWithDefaults(
|
||||
clu.Datacenter,
|
||||
nearClu.Datacenter,
|
||||
clientSID,
|
||||
func(s *topology.Service) {
|
||||
// Upstream per partition
|
||||
s.Upstreams = []*topology.Upstream{
|
||||
{
|
||||
ID: topology.ServiceID{
|
||||
Name: server.ID.Name,
|
||||
Partition: part.Name,
|
||||
Name: nearServerSID.Name,
|
||||
Partition: nearServerSID.Partition,
|
||||
Namespace: nearServerSID.Namespace,
|
||||
},
|
||||
LocalPort: 5000,
|
||||
// exposed so we can hit it directly
|
||||
|
@ -157,276 +358,109 @@ func (s *ac6FailoversSuite) setupAC6Failovers(ct *commonTopo, clu, peerClu *topo
|
|||
}
|
||||
},
|
||||
)
|
||||
ct.ExportService(clu, partition,
|
||||
api.ExportedService{
|
||||
Name: client.ID.Name,
|
||||
Consumers: func() []api.ServiceConsumer {
|
||||
var consumers []api.ServiceConsumer
|
||||
// Export to each peer
|
||||
for _, peer := range peers {
|
||||
consumers = append(consumers, api.ServiceConsumer{
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
return consumers
|
||||
}(),
|
||||
},
|
||||
)
|
||||
ct.AddServiceNode(clu, serviceExt{Service: client})
|
||||
|
||||
clu.InitialConfigEntries = append(clu.InitialConfigEntries,
|
||||
ct.AddServiceNode(nearClu, serviceExt{Service: client})
|
||||
nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries,
|
||||
&api.ServiceConfigEntry{
|
||||
Kind: api.ServiceDefaults,
|
||||
Name: client.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
Name: clientSID.Name,
|
||||
Partition: ConfigEntryPartition(clientSID.Partition),
|
||||
Namespace: clientSID.Namespace,
|
||||
Protocol: "http",
|
||||
},
|
||||
)
|
||||
|
||||
// Add intention allowing local and peered clients to call server
|
||||
clu.InitialConfigEntries = append(clu.InitialConfigEntries,
|
||||
// intentions
|
||||
nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries,
|
||||
&api.ServiceIntentionsConfigEntry{
|
||||
Kind: api.ServiceIntentions,
|
||||
Name: server.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
// SourceIntention for local client and peered clients
|
||||
Sources: func() []*api.SourceIntention {
|
||||
ixns := []*api.SourceIntention{
|
||||
{
|
||||
Name: client.ID.Name,
|
||||
Partition: ConfigEntryPartition(part.Name),
|
||||
Name: nearServerSID.Name,
|
||||
Partition: ConfigEntryPartition(nearServerSID.Partition),
|
||||
Namespace: nearServerSID.Namespace,
|
||||
Sources: []*api.SourceIntention{{
|
||||
Name: clientSID.Name,
|
||||
Namespace: clientSID.Namespace,
|
||||
// in this field, "" -> destination partition, so no ConfigEntryPartition :eyeroll:
|
||||
// https://developer.hashicorp.com/consul/docs/connect/config-entries/service-intentions#sources-partition
|
||||
Partition: topology.PartitionOrDefault(clientSID.Partition),
|
||||
Action: api.IntentionActionAllow,
|
||||
},
|
||||
}
|
||||
for _, peer := range peers {
|
||||
ixns = append(ixns, &api.SourceIntention{
|
||||
Name: client.ID.Name,
|
||||
Peer: peer,
|
||||
Action: api.IntentionActionAllow,
|
||||
})
|
||||
}
|
||||
return ixns
|
||||
}(),
|
||||
}},
|
||||
},
|
||||
)
|
||||
if s.ac6 == nil {
|
||||
s.ac6 = map[nodeKey]ac6FailoversContext{}
|
||||
farSource := api.SourceIntention{
|
||||
Name: clientSID.Name,
|
||||
Namespace: clientSID.Namespace,
|
||||
Peer: LocalPeerName(nearClu, clientSID.Partition),
|
||||
Action: api.IntentionActionAllow,
|
||||
}
|
||||
s.ac6[nodeKey{clu.Datacenter, partition}] = struct {
|
||||
clientSID topology.ServiceID
|
||||
serverSID topology.ServiceID
|
||||
serverNode topology.NodeID
|
||||
}{
|
||||
clientSID: clientSID,
|
||||
serverSID: serverSID,
|
||||
serverNode: serverNode.ID(),
|
||||
if nearClu == farClu {
|
||||
farSource.Peer = ""
|
||||
// in this field, "" -> destination partition, so no ConfigEntryPartition :eyeroll:
|
||||
// https://developer.hashicorp.com/consul/docs/connect/config-entries/service-intentions#sources-partition
|
||||
farSource.Partition = topology.PartitionOrDefault(clientSID.Partition)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ac6FailoversSuite) setupAC6FailoversDC3(ct *commonTopo, clu, peer1, peer2 *topology.Cluster) {
|
||||
var peers []string
|
||||
for _, part := range peer1.Partitions {
|
||||
peers = append(peers, LocalPeerName(peer1, part.Name))
|
||||
}
|
||||
for _, part := range peer2.Partitions {
|
||||
peers = append(peers, LocalPeerName(peer2, part.Name))
|
||||
}
|
||||
|
||||
partition := "default"
|
||||
|
||||
// Make an HTTP server
|
||||
server := NewFortioServiceWithDefaults(
|
||||
clu.Datacenter,
|
||||
topology.ServiceID{
|
||||
Name: "ac6-failover-svc",
|
||||
Partition: partition,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
ct.AddServiceNode(clu, serviceExt{
|
||||
Service: server,
|
||||
Config: &api.ServiceConfigEntry{
|
||||
Kind: api.ServiceDefaults,
|
||||
Name: server.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
Protocol: "http",
|
||||
},
|
||||
Intentions: &api.ServiceIntentionsConfigEntry{
|
||||
farClu.InitialConfigEntries = append(farClu.InitialConfigEntries,
|
||||
&api.ServiceIntentionsConfigEntry{
|
||||
Kind: api.ServiceIntentions,
|
||||
Name: server.ID.Name,
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
Sources: func() []*api.SourceIntention {
|
||||
var ixns []*api.SourceIntention
|
||||
for _, peer := range peers {
|
||||
ixns = append(ixns, &api.SourceIntention{
|
||||
Name: "ac6-client",
|
||||
Peer: peer,
|
||||
Action: api.IntentionActionAllow,
|
||||
})
|
||||
}
|
||||
return ixns
|
||||
}(),
|
||||
Name: farServerSID.Name,
|
||||
Partition: ConfigEntryPartition(farServerSID.Partition),
|
||||
Namespace: farServerSID.Namespace,
|
||||
Sources: []*api.SourceIntention{&farSource},
|
||||
},
|
||||
Exports: func() []api.ServiceConsumer {
|
||||
var consumers []api.ServiceConsumer
|
||||
for _, peer := range peers {
|
||||
consumers = append(consumers, api.ServiceConsumer{
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
return consumers
|
||||
}(),
|
||||
})
|
||||
)
|
||||
|
||||
s.clientSID = clientSID
|
||||
s.nearServerSID = nearServerSID
|
||||
s.farServerSID = farServerSID
|
||||
s.nearServerNode = nearServerNode.ID()
|
||||
s.farServerNode = farServerNode.ID()
|
||||
}
|
||||
|
||||
func (s *ac6FailoversSuite) test(t *testing.T, ct *commonTopo) {
|
||||
dc1 := ct.Sprawl.Topology().Clusters["dc1"]
|
||||
dc2 := ct.Sprawl.Topology().Clusters["dc2"]
|
||||
|
||||
type testcase struct {
|
||||
name string
|
||||
cluster *topology.Cluster
|
||||
peer *topology.Cluster
|
||||
partition string
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
name: "dc1 default partition failovers",
|
||||
cluster: dc1,
|
||||
peer: dc2, // dc3 is hardcoded
|
||||
partition: "default",
|
||||
},
|
||||
{
|
||||
name: "dc1 part1 partition failovers",
|
||||
cluster: dc1,
|
||||
peer: dc2, // dc3 is hardcoded
|
||||
partition: "part1",
|
||||
},
|
||||
{
|
||||
name: "dc2 default partition failovers",
|
||||
cluster: dc2,
|
||||
peer: dc1, // dc3 is hardcoded
|
||||
partition: "default",
|
||||
},
|
||||
{
|
||||
name: "dc2 part1 partition failovers",
|
||||
cluster: dc2,
|
||||
peer: dc1, // dc3 is hardcoded
|
||||
partition: "part1",
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// NOTE: *not parallel* because we mutate resources that are shared
|
||||
// between test cases (disable/enable nodes)
|
||||
if !utils.IsEnterprise() && tc.partition != "default" {
|
||||
t.Skip("skipping enterprise test")
|
||||
|
||||
nearClu := ct.Sprawl.Topology().Clusters["dc1"]
|
||||
farClu := ct.Sprawl.Topology().Clusters["dc2"]
|
||||
if s.NearInDial {
|
||||
nearClu = ct.Sprawl.Topology().Clusters["dc2"]
|
||||
}
|
||||
if s.FarInAcc {
|
||||
farClu = ct.Sprawl.Topology().Clusters["dc1"]
|
||||
}
|
||||
partition := tc.partition
|
||||
clu := tc.cluster
|
||||
peerClu := tc.peer
|
||||
|
||||
svcs := clu.ServicesByID(s.ac6[nodeKey{clu.Datacenter, partition}].clientSID)
|
||||
svcs := nearClu.ServicesByID(s.clientSID)
|
||||
require.Len(t, svcs, 1, "expected exactly one client in datacenter")
|
||||
|
||||
serverSID := s.ac6[nodeKey{clu.Datacenter, partition}].serverSID
|
||||
serverSID.Normalize()
|
||||
|
||||
client := svcs[0]
|
||||
require.Len(t, client.Upstreams, 1, "expected one upstream for client")
|
||||
|
||||
u := client.Upstreams[0]
|
||||
ct.Assert.CatalogServiceExists(t, clu.Name, u.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{
|
||||
Partition: u.ID.Partition,
|
||||
}))
|
||||
|
||||
t.Cleanup(func() {
|
||||
cfg := ct.Sprawl.Config()
|
||||
for _, part := range clu.Partitions {
|
||||
EnableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, part.Name}].serverNode)
|
||||
}
|
||||
for _, part := range peerClu.Partitions {
|
||||
EnableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, part.Name}].serverNode)
|
||||
}
|
||||
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
||||
})
|
||||
upstream := client.Upstreams[0]
|
||||
|
||||
fmt.Println("### preconditions")
|
||||
// TODO: deduce this number, instead of hard-coding
|
||||
nFailoverTargets := 4
|
||||
// in CE, we don't have failover targets for non-default partitions
|
||||
if !utils.IsEnterprise() {
|
||||
nFailoverTargets = 3
|
||||
}
|
||||
for i := 0; i < nFailoverTargets; i++ {
|
||||
ct.Assert.UpstreamEndpointStatus(t, client, fmt.Sprintf("failover-target~%d~%s", i, clusterPrefix(u, clu.Datacenter)), "HEALTHY", 1)
|
||||
}
|
||||
|
||||
ct.Assert.FortioFetch2FortioName(t, client, u, clu.Name, serverSID)
|
||||
// this is the server in the same DC and partitions as client
|
||||
serverSID := s.nearServerSID
|
||||
serverSID.Normalize()
|
||||
ct.Assert.FortioFetch2FortioName(t, client, upstream, nearClu.Name, serverSID)
|
||||
|
||||
ct.Assert.CatalogServiceExists(t, nearClu.Name, upstream.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{
|
||||
Partition: upstream.ID.Partition,
|
||||
Namespace: upstream.ID.Namespace,
|
||||
}))
|
||||
|
||||
if t.Failed() {
|
||||
t.Fatalf("failed preconditions")
|
||||
t.Fatal("failed preconditions")
|
||||
}
|
||||
|
||||
fmt.Println("### Failover to peer target")
|
||||
fmt.Println("### failover")
|
||||
|
||||
cfg := ct.Sprawl.Config()
|
||||
DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode)
|
||||
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
||||
DisableNode(t, cfg, nearClu.Name, s.nearServerNode)
|
||||
require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, "failover"))
|
||||
// Clusters for imported services rely on outlier detection for
|
||||
// failovers, NOT eds_health_status. This means that killing the
|
||||
// node above does not actually make the envoy cluster UNHEALTHY
|
||||
// so we do not assert for it.
|
||||
expectUID := topology.ServiceID{
|
||||
Name: u.ID.Name,
|
||||
Partition: "default",
|
||||
}
|
||||
expectUID.Normalize()
|
||||
ct.Assert.FortioFetch2FortioName(t, client, u, peerClu.Name, expectUID)
|
||||
|
||||
if utils.IsEnterprise() {
|
||||
fmt.Println("### Failover to peer target in non-default partition")
|
||||
cfg = ct.Sprawl.Config()
|
||||
DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode)
|
||||
DisableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, "default"}].serverNode)
|
||||
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
||||
// Retry until outlier_detection deems the cluster
|
||||
// unhealthy and fails over to peer part1.
|
||||
expectUID = topology.ServiceID{
|
||||
Name: u.ID.Name,
|
||||
Partition: "part1",
|
||||
}
|
||||
expectUID.Normalize()
|
||||
ct.Assert.FortioFetch2FortioName(t, client, u, peerClu.Name, expectUID)
|
||||
}
|
||||
|
||||
fmt.Println("### Failover to dc3 peer target")
|
||||
cfg = ct.Sprawl.Config()
|
||||
DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode)
|
||||
// Disable all partitions for peer
|
||||
for _, part := range peerClu.Partitions {
|
||||
DisableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, part.Name}].serverNode)
|
||||
}
|
||||
require.NoError(t, ct.Sprawl.Relaunch(cfg))
|
||||
// This will retry until outlier_detection deems the cluster
|
||||
// unhealthy and fails over to dc3.
|
||||
expectUID = topology.ServiceID{
|
||||
Name: u.ID.Name,
|
||||
Partition: "default",
|
||||
}
|
||||
expectUID.Normalize()
|
||||
ct.Assert.FortioFetch2FortioName(t, client, u, "dc3", expectUID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func clusterPrefix(u *topology.Upstream, dc string) string {
|
||||
u.ID.Normalize()
|
||||
switch u.ID.Partition {
|
||||
case "default":
|
||||
return fmt.Sprintf("%s.%s.%s.internal", u.ID.Name, u.ID.Namespace, dc)
|
||||
default:
|
||||
return fmt.Sprintf("%s.%s.%s.%s.internal-v1", u.ID.Name, u.ID.Namespace, u.ID.Partition, dc)
|
||||
}
|
||||
expectSID := s.farServerSID
|
||||
expectSID.Normalize()
|
||||
ct.Assert.FortioFetch2FortioName(t, client, upstream, farClu.Name, expectSID)
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/test-integ/topoutil"
|
||||
)
|
||||
|
||||
// TestRotateGW ensures that peered services continue to be able to talk to their
|
||||
|
@ -143,10 +141,10 @@ func (s *suiteRotateGW) setup(t *testing.T, ct *commonTopo) {
|
|||
// add a second mesh gateway "new"
|
||||
s.newMGWNodeName = fmt.Sprintf("new-%s-default-mgw", clu.Name)
|
||||
nodeKind := topology.NodeKindClient
|
||||
if clu.Datacenter == agentlessDC {
|
||||
if clu.Datacenter == ct.agentlessDC {
|
||||
nodeKind = topology.NodeKindDataplane
|
||||
}
|
||||
clu.Nodes = append(clu.Nodes, topoutil.NewTopologyMeshGatewaySet(
|
||||
_, mgwNodes := newTopologyMeshGatewaySet(
|
||||
nodeKind,
|
||||
"default",
|
||||
s.newMGWNodeName,
|
||||
|
@ -155,7 +153,8 @@ func (s *suiteRotateGW) setup(t *testing.T, ct *commonTopo) {
|
|||
func(i int, node *topology.Node) {
|
||||
node.Disabled = true
|
||||
},
|
||||
)...)
|
||||
)
|
||||
clu.Nodes = append(clu.Nodes, mgwNodes...)
|
||||
}
|
||||
|
||||
func (s *suiteRotateGW) test(t *testing.T, ct *commonTopo) {
|
||||
|
|
|
@ -8,12 +8,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
"github.com/mitchellh/copystructure"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
@ -165,7 +166,7 @@ func (s *ac7_2RotateLeaderSuite) test(t *testing.T, ct *commonTopo) {
|
|||
found := 0
|
||||
foundI := 0
|
||||
for i, svc := range ceAsES.Services {
|
||||
if svc.Name == s.sidServer.Name && svc.Namespace == utils.DefaultToEmpty(s.sidServer.Namespace) {
|
||||
if svc.Name == s.sidServer.Name && utils.DefaultToEmpty(svc.Namespace) == utils.DefaultToEmpty(s.sidServer.Namespace) {
|
||||
found += 1
|
||||
foundI = i
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ type commonTopo struct {
|
|||
// shortcuts to corresponding entry in Cfg
|
||||
DC1 *topology.Cluster
|
||||
DC2 *topology.Cluster
|
||||
// nil if includeDC3 is false
|
||||
DC3 *topology.Cluster
|
||||
|
||||
// set after Launch. Should be considered read-only
|
||||
|
@ -48,66 +49,101 @@ type commonTopo struct {
|
|||
|
||||
// track per-DC services to prevent duplicates
|
||||
services map[string]map[topology.ServiceID]struct{}
|
||||
|
||||
// if zero, no DCs are agentless
|
||||
agentlessDC string
|
||||
|
||||
// if true, create DC3 and associated links (currently only used by ac5.2)
|
||||
includeDC3 bool
|
||||
|
||||
peerThroughMGW bool
|
||||
}
|
||||
|
||||
const agentlessDC = "dc2"
|
||||
func NewCommonTopoWithoutAgentless(t *testing.T) *commonTopo {
|
||||
t.Helper()
|
||||
return newCommonTopo(t, "", false, true)
|
||||
}
|
||||
|
||||
func NewCommonTopo(t *testing.T) *commonTopo {
|
||||
t.Helper()
|
||||
return newCommonTopo(t, "dc2", false, true)
|
||||
}
|
||||
|
||||
ct := commonTopo{}
|
||||
func newCommonTopo(t *testing.T, agentlessDC string, includeDC3 bool, peerThroughMGW bool) *commonTopo {
|
||||
t.Helper()
|
||||
|
||||
ct := commonTopo{
|
||||
agentlessDC: agentlessDC,
|
||||
includeDC3: includeDC3,
|
||||
peerThroughMGW: peerThroughMGW,
|
||||
}
|
||||
|
||||
const nServers = 3
|
||||
|
||||
// Make 3-server clusters in dc1 and dc2
|
||||
// For simplicity, the Name and Datacenter of the clusters are the same.
|
||||
// dc1 and dc2 should be symmetric.
|
||||
dc1 := clusterWithJustServers("dc1", nServers)
|
||||
dc1 := ct.clusterWithJustServers("dc1", nServers)
|
||||
ct.DC1 = dc1
|
||||
dc2 := clusterWithJustServers("dc2", nServers)
|
||||
dc2 := ct.clusterWithJustServers("dc2", nServers)
|
||||
ct.DC2 = dc2
|
||||
clusters := []*topology.Cluster{dc1, dc2}
|
||||
|
||||
var dc3 *topology.Cluster
|
||||
|
||||
if ct.includeDC3 {
|
||||
// dc3 is a failover cluster for both dc1 and dc2
|
||||
dc3 := clusterWithJustServers("dc3", 1)
|
||||
dc3 = ct.clusterWithJustServers("dc3", 1)
|
||||
// dc3 is only used for certain failover scenarios and does not need tenancies
|
||||
dc3.Partitions = []*topology.Partition{{Name: "default"}}
|
||||
ct.DC3 = dc3
|
||||
|
||||
injectTenancies(dc1)
|
||||
injectTenancies(dc2)
|
||||
// dc3 is only used for certain failover scenarios and does not need tenancies
|
||||
dc3.Partitions = []*topology.Partition{{Name: "default"}}
|
||||
|
||||
clusters = append(clusters, dc3)
|
||||
}
|
||||
|
||||
injectTenancies(dc1)
|
||||
injectTenancies(dc2)
|
||||
// dc3 doesn't get tenancies
|
||||
|
||||
ct.services = map[string]map[topology.ServiceID]struct{}{}
|
||||
for _, dc := range []*topology.Cluster{dc1, dc2, dc3} {
|
||||
for _, dc := range clusters {
|
||||
ct.services[dc.Datacenter] = map[topology.ServiceID]struct{}{}
|
||||
}
|
||||
|
||||
peerings := addPeerings(dc1, dc2)
|
||||
if ct.includeDC3 {
|
||||
peerings = append(peerings, addPeerings(dc1, dc3)...)
|
||||
peerings = append(peerings, addPeerings(dc2, dc3)...)
|
||||
}
|
||||
|
||||
addMeshGateways(dc1)
|
||||
addMeshGateways(dc2)
|
||||
addMeshGateways(dc3)
|
||||
ct.addMeshGateways(dc1)
|
||||
ct.addMeshGateways(dc2)
|
||||
if ct.includeDC3 {
|
||||
ct.addMeshGateways(dc3)
|
||||
}
|
||||
|
||||
setupGlobals(dc1)
|
||||
setupGlobals(dc2)
|
||||
setupGlobals(dc3)
|
||||
ct.setupGlobals(dc1)
|
||||
ct.setupGlobals(dc2)
|
||||
if ct.includeDC3 {
|
||||
ct.setupGlobals(dc3)
|
||||
}
|
||||
|
||||
networks := []*topology.Network{
|
||||
{Name: "wan", Type: "wan"},
|
||||
{Name: dc1.Datacenter}, // "dc1" LAN
|
||||
{Name: dc2.Datacenter}, // "dc2" LAN
|
||||
}
|
||||
if ct.includeDC3 {
|
||||
networks = append(networks, &topology.Network{Name: dc3.Datacenter})
|
||||
}
|
||||
|
||||
// Build final configuration
|
||||
ct.Cfg = &topology.Config{
|
||||
Images: utils.TargetImages(),
|
||||
Networks: []*topology.Network{
|
||||
{Name: dc1.Datacenter}, // "dc1" LAN
|
||||
{Name: dc2.Datacenter}, // "dc2" LAN
|
||||
{Name: dc3.Datacenter}, // "dc3" LAN
|
||||
{Name: "wan", Type: "wan"},
|
||||
},
|
||||
Clusters: []*topology.Cluster{
|
||||
dc1,
|
||||
dc2,
|
||||
dc3,
|
||||
},
|
||||
Networks: networks,
|
||||
Clusters: clusters,
|
||||
Peerings: peerings,
|
||||
}
|
||||
return &ct
|
||||
|
@ -142,10 +178,12 @@ func (ct *commonTopo) postLaunchChecks(t *testing.T) {
|
|||
for _, e := range clu.InitialConfigEntries {
|
||||
if e.GetKind() == api.ExportedServices {
|
||||
asExport := e.(*api.ExportedServicesConfigEntry)
|
||||
// do we care about the partition?
|
||||
for _, svc := range asExport.Services {
|
||||
for _, con := range svc.Consumers {
|
||||
// do we care about con.Partition?
|
||||
// if Peer is unset, this is an export to another partition in the same DC, so we don't need to check it
|
||||
if con.Peer == "" {
|
||||
continue
|
||||
}
|
||||
// TODO: surely there is code to normalize this
|
||||
partition := asExport.Partition
|
||||
if partition == "" {
|
||||
|
@ -183,6 +221,9 @@ func (ct *commonTopo) postLaunchChecks(t *testing.T) {
|
|||
// PeerName is how you'd address a remote dc+partition locally
|
||||
// as your peer name.
|
||||
func LocalPeerName(clu *topology.Cluster, partition string) string {
|
||||
if partition == "" {
|
||||
partition = "default"
|
||||
}
|
||||
return fmt.Sprintf("peer-%s-%s", clu.Datacenter, partition)
|
||||
}
|
||||
|
||||
|
@ -227,7 +268,7 @@ func (ct *commonTopo) AddServiceNode(clu *topology.Cluster, svc serviceExt) *top
|
|||
nodeKind := topology.NodeKindClient
|
||||
// TODO: bug in deployer somewhere; it should guard against a KindDataplane node with
|
||||
// DisableServiceMesh services on it; dataplane is only for service-mesh
|
||||
if !svc.DisableServiceMesh && clu.Datacenter == agentlessDC {
|
||||
if !svc.DisableServiceMesh && clu.Datacenter == ct.agentlessDC {
|
||||
nodeKind = topology.NodeKindDataplane
|
||||
}
|
||||
|
||||
|
@ -306,13 +347,9 @@ func (ct *commonTopo) ClusterByDatacenter(t *testing.T, name string) *topology.C
|
|||
return nil
|
||||
}
|
||||
|
||||
// Since CE config entries do not contain the partition field,
|
||||
// this func converts default partition to empty string.
|
||||
// Deprecated: topoutil.ConfigEntryPartition
|
||||
func ConfigEntryPartition(p string) string {
|
||||
if p == "default" {
|
||||
return "" // make this CE friendly
|
||||
}
|
||||
return p
|
||||
return topoutil.ConfigEntryPartition(p)
|
||||
}
|
||||
|
||||
// DisableNode is a no-op if the node is already disabled.
|
||||
|
@ -335,7 +372,7 @@ func EnableNode(t *testing.T, cfg *topology.Config, clusterName string, nid topo
|
|||
return cfg
|
||||
}
|
||||
|
||||
func setupGlobals(clu *topology.Cluster) {
|
||||
func (ct *commonTopo) setupGlobals(clu *topology.Cluster) {
|
||||
for _, part := range clu.Partitions {
|
||||
clu.InitialConfigEntries = append(clu.InitialConfigEntries,
|
||||
&api.ProxyConfigEntry{
|
||||
|
@ -349,6 +386,9 @@ func setupGlobals(clu *topology.Cluster) {
|
|||
Mode: api.MeshGatewayModeLocal,
|
||||
},
|
||||
},
|
||||
)
|
||||
if ct.peerThroughMGW {
|
||||
clu.InitialConfigEntries = append(clu.InitialConfigEntries,
|
||||
&api.MeshConfigEntry{
|
||||
Peering: &api.PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: true,
|
||||
|
@ -357,27 +397,52 @@ func setupGlobals(clu *topology.Cluster) {
|
|||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addMeshGateways adds a mesh gateway for every partition in the cluster.
|
||||
// Assumes that the LAN network name is equal to datacenter name.
|
||||
func addMeshGateways(c *topology.Cluster) {
|
||||
func (ct *commonTopo) addMeshGateways(c *topology.Cluster) {
|
||||
nodeKind := topology.NodeKindClient
|
||||
if c.Datacenter == agentlessDC {
|
||||
if c.Datacenter == ct.agentlessDC {
|
||||
nodeKind = topology.NodeKindDataplane
|
||||
}
|
||||
for _, p := range c.Partitions {
|
||||
c.Nodes = topology.MergeSlices(c.Nodes, topoutil.NewTopologyMeshGatewaySet(
|
||||
sid, nodes := newTopologyMeshGatewaySet(
|
||||
nodeKind,
|
||||
p.Name,
|
||||
fmt.Sprintf("%s-%s-mgw", c.Name, p.Name),
|
||||
1,
|
||||
[]string{c.Datacenter, "wan"},
|
||||
nil,
|
||||
))
|
||||
)
|
||||
c.Nodes = topology.MergeSlices(c.Nodes, nodes)
|
||||
// for services exported in the same cluster between partitions, we need
|
||||
// to export the mesh gateway (but not for peering)
|
||||
// https://github.com/hashicorp/consul/pull/19052
|
||||
consumers := []api.ServiceConsumer{}
|
||||
for _, cp := range c.Partitions {
|
||||
if cp.Name == p.Name {
|
||||
continue
|
||||
}
|
||||
consumers = append(consumers, api.ServiceConsumer{
|
||||
Partition: cp.Name,
|
||||
})
|
||||
}
|
||||
if len(consumers) > 0 {
|
||||
ct.ExportService(c, p.Name, api.ExportedService{
|
||||
Name: sid.Name,
|
||||
Namespace: sid.Namespace,
|
||||
Consumers: consumers,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func clusterWithJustServers(name string, numServers int) *topology.Cluster {
|
||||
func (ct *commonTopo) clusterWithJustServers(name string, numServers int) *topology.Cluster {
|
||||
nets := []string{name}
|
||||
if !ct.peerThroughMGW {
|
||||
nets = append(nets, "wan")
|
||||
}
|
||||
return &topology.Cluster{
|
||||
Enterprise: utils.IsEnterprise(),
|
||||
Name: name,
|
||||
|
@ -385,7 +450,7 @@ func clusterWithJustServers(name string, numServers int) *topology.Cluster {
|
|||
Nodes: topoutil.NewTopologyServerSet(
|
||||
name+"-server",
|
||||
numServers,
|
||||
[]string{name},
|
||||
nets,
|
||||
nil,
|
||||
),
|
||||
}
|
||||
|
@ -446,3 +511,16 @@ func NewFortioServiceWithDefaults(
|
|||
) *topology.Service {
|
||||
return topoutil.NewFortioServiceWithDefaults(cluster, sid, topology.NodeVersionV1, mut)
|
||||
}
|
||||
|
||||
func newTopologyMeshGatewaySet(
|
||||
nodeKind topology.NodeKind,
|
||||
partition string,
|
||||
namePrefix string,
|
||||
num int,
|
||||
networks []string,
|
||||
mutateFn func(i int, node *topology.Node),
|
||||
) (topology.ServiceID, []*topology.Node) {
|
||||
nodes := topoutil.NewTopologyMeshGatewaySet(nodeKind, partition, namePrefix, num, networks, mutateFn)
|
||||
sid := nodes[0].Services[0].ID
|
||||
return sid, nodes
|
||||
}
|
||||
|
|
|
@ -198,24 +198,6 @@ func (a *Asserter) HealthyWithPeer(t *testing.T, cluster string, sid topology.Se
|
|||
})
|
||||
}
|
||||
|
||||
func (a *Asserter) UpstreamEndpointHealthy(t *testing.T, svc *topology.Service, upstream *topology.Upstream) {
|
||||
t.Helper()
|
||||
node := svc.Node
|
||||
ip := node.LocalAddress()
|
||||
port := svc.EnvoyAdminPort
|
||||
addr := fmt.Sprintf("%s:%d", ip, port)
|
||||
|
||||
client := a.mustGetHTTPClient(t, node.Cluster)
|
||||
libassert.AssertUpstreamEndpointStatusWithClient(t,
|
||||
client,
|
||||
addr,
|
||||
// TODO: what is default? namespace? partition?
|
||||
fmt.Sprintf("%s.default.%s.external", upstream.ID.Name, upstream.Peer),
|
||||
"HEALTHY",
|
||||
1,
|
||||
)
|
||||
}
|
||||
|
||||
type testingT interface {
|
||||
require.TestingT
|
||||
Helper()
|
||||
|
|
|
@ -96,6 +96,10 @@ func NewTopologyMeshGatewaySet(
|
|||
mutateFn func(i int, node *topology.Node),
|
||||
) []*topology.Node {
|
||||
var out []*topology.Node
|
||||
sid := topology.ServiceID{
|
||||
Name: "mesh-gateway",
|
||||
Partition: ConfigEntryPartition(partition),
|
||||
}
|
||||
for i := 1; i <= num; i++ {
|
||||
name := namePrefix + strconv.Itoa(i)
|
||||
|
||||
|
@ -104,7 +108,7 @@ func NewTopologyMeshGatewaySet(
|
|||
Partition: partition,
|
||||
Name: name,
|
||||
Services: []*topology.Service{{
|
||||
ID: topology.ServiceID{Name: "mesh-gateway"},
|
||||
ID: sid,
|
||||
Port: 8443,
|
||||
EnvoyAdminPort: 19000,
|
||||
IsMeshGateway: true,
|
||||
|
@ -122,3 +126,12 @@ func NewTopologyMeshGatewaySet(
|
|||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Since CE config entries do not contain the partition field,
|
||||
// this func converts default partition to empty string.
|
||||
func ConfigEntryPartition(p string) string {
|
||||
if p == "default" {
|
||||
return "" // make this CE friendly
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -16,13 +16,13 @@ import (
|
|||
"time"
|
||||
|
||||
goretry "github.com/avast/retry-go"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/teris-io/shortid"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
)
|
||||
|
||||
|
|
|
@ -4,17 +4,19 @@
|
|||
package ratelimit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
||||
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -49,6 +51,7 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|||
cmd string
|
||||
operations []operation
|
||||
mode string
|
||||
enterpriseOnly bool
|
||||
}
|
||||
|
||||
// getKV and putKV are net/RPC calls
|
||||
|
@ -69,6 +72,30 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|||
rateLimitType: "global/write",
|
||||
}
|
||||
|
||||
// listPartition and putPartition are gRPC calls
|
||||
listPartition := action{
|
||||
function: func(client *api.Client) error {
|
||||
ctx := context.Background()
|
||||
_, _, err := client.Partitions().List(ctx, nil)
|
||||
return err
|
||||
},
|
||||
rateLimitOperation: "/partition.PartitionService/List",
|
||||
rateLimitType: "global/read",
|
||||
}
|
||||
|
||||
putPartition := action{
|
||||
function: func(client *api.Client) error {
|
||||
ctx := context.Background()
|
||||
p := api.Partition{
|
||||
Name: "ptest",
|
||||
}
|
||||
_, _, err := client.Partitions().Create(ctx, &p, nil)
|
||||
return err
|
||||
},
|
||||
rateLimitOperation: "/partition.PartitionService/Write",
|
||||
rateLimitType: "global/write",
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
// HTTP & net/RPC
|
||||
{
|
||||
|
@ -128,9 +155,73 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
// gRPC
|
||||
{
|
||||
description: "GRPC / Mode: disabled - errors: no / exceeded logs: no / metrics: no",
|
||||
cmd: `-hcl=limits { request_limits { mode = "disabled" read_rate = 0 write_rate = 0 }}`,
|
||||
mode: "disabled",
|
||||
operations: []operation{
|
||||
{
|
||||
action: putPartition,
|
||||
expectedErrorMsg: "",
|
||||
expectExceededLog: false,
|
||||
expectMetric: false,
|
||||
},
|
||||
{
|
||||
action: listPartition,
|
||||
expectedErrorMsg: "",
|
||||
expectExceededLog: false,
|
||||
expectMetric: false,
|
||||
},
|
||||
},
|
||||
enterpriseOnly: true,
|
||||
},
|
||||
{
|
||||
description: "GRPC / Mode: permissive - errors: no / exceeded logs: yes / metrics: no",
|
||||
cmd: `-hcl=limits { request_limits { mode = "permissive" read_rate = 0 write_rate = 0 }}`,
|
||||
mode: "permissive",
|
||||
operations: []operation{
|
||||
{
|
||||
action: putPartition,
|
||||
expectedErrorMsg: "",
|
||||
expectExceededLog: true,
|
||||
expectMetric: true,
|
||||
},
|
||||
{
|
||||
action: listPartition,
|
||||
expectedErrorMsg: "",
|
||||
expectExceededLog: true,
|
||||
expectMetric: true,
|
||||
},
|
||||
},
|
||||
enterpriseOnly: true,
|
||||
},
|
||||
{
|
||||
description: "GRPC / Mode: enforcing - errors: yes / exceeded logs: yes / metrics: yes",
|
||||
cmd: `-hcl=limits { request_limits { mode = "enforcing" read_rate = 0 write_rate = 0 }}`,
|
||||
mode: "enforcing",
|
||||
operations: []operation{
|
||||
{
|
||||
action: putPartition,
|
||||
expectedErrorMsg: nonRetryableErrorMsg,
|
||||
expectExceededLog: true,
|
||||
expectMetric: true,
|
||||
},
|
||||
{
|
||||
action: listPartition,
|
||||
expectedErrorMsg: retryableErrorMsg,
|
||||
expectExceededLog: true,
|
||||
expectMetric: true,
|
||||
},
|
||||
},
|
||||
enterpriseOnly: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
if tc.enterpriseOnly && !utils.IsEnterprise() {
|
||||
continue
|
||||
}
|
||||
tc := tc
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
|
@ -98,23 +98,26 @@ func DockerImages(
|
|||
run *runner.Runner,
|
||||
t *topology.Topology,
|
||||
) error {
|
||||
logw := logger.Named("docker").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug})
|
||||
|
||||
built := make(map[string]struct{})
|
||||
for _, c := range t.Clusters {
|
||||
for _, n := range c.Nodes {
|
||||
joint := n.Images.EnvoyConsulImage()
|
||||
if _, ok := built[joint]; joint != "" && !ok {
|
||||
logger.Info("building image", "image", joint)
|
||||
err := run.DockerExec(context.TODO(), []string{
|
||||
logger.Info("building envoy+consul image", "image", joint)
|
||||
logw := logger.Named("docker_envoy_consul").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug})
|
||||
|
||||
err := run.DockerExecWithStderr(context.TODO(), []string{
|
||||
"build",
|
||||
// provenance causes non-idempotent builds, which leads to spurious terraform replacements
|
||||
"--provenance=false",
|
||||
"--build-arg",
|
||||
"CONSUL_IMAGE=" + n.Images.Consul,
|
||||
"--build-arg",
|
||||
"ENVOY_IMAGE=" + n.Images.Envoy,
|
||||
"-t", joint,
|
||||
"-",
|
||||
}, logw, strings.NewReader(dockerfileEnvoy))
|
||||
}, logw, logw, strings.NewReader(dockerfileEnvoy))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -124,14 +127,16 @@ func DockerImages(
|
|||
|
||||
cdp := n.Images.LocalDataplaneImage()
|
||||
if _, ok := built[cdp]; cdp != "" && !ok {
|
||||
logger.Info("building image", "image", cdp)
|
||||
err := run.DockerExec(context.TODO(), []string{
|
||||
logger.Info("building dataplane image", "image", cdp)
|
||||
logw := logger.Named("docker_dataplane").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug})
|
||||
err := run.DockerExecWithStderr(context.TODO(), []string{
|
||||
"build",
|
||||
"--provenance=false",
|
||||
"--build-arg",
|
||||
"DATAPLANE_IMAGE=" + n.Images.Dataplane,
|
||||
"-t", cdp,
|
||||
"-",
|
||||
}, logw, strings.NewReader(dockerfileDataplane))
|
||||
}, logw, logw, strings.NewReader(dockerfileDataplane))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -142,7 +147,8 @@ func DockerImages(
|
|||
cdpTproxy := n.Images.LocalDataplaneTProxyImage()
|
||||
if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok {
|
||||
logger.Info("building image", "image", cdpTproxy)
|
||||
err := run.DockerExec(context.TODO(), []string{
|
||||
logw := logger.Named("docker_dataplane_tproxy").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug})
|
||||
err := run.DockerExecWithStderr(context.TODO(), []string{
|
||||
"build",
|
||||
"--build-arg",
|
||||
"DATAPLANE_IMAGE=" + n.Images.Dataplane,
|
||||
|
@ -150,7 +156,7 @@ func DockerImages(
|
|||
"CONSUL_IMAGE=" + n.Images.Consul,
|
||||
"-t", cdpTproxy,
|
||||
"-",
|
||||
}, logw, strings.NewReader(dockerfileDataplaneForTProxy))
|
||||
}, logw, logw, strings.NewReader(dockerfileDataplaneForTProxy))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -62,6 +63,8 @@ func (s *Sprawl) initPeerings() error {
|
|||
req1.Partition = peering.Accepting.Partition
|
||||
}
|
||||
|
||||
s.awaitMeshGateways()
|
||||
|
||||
GENTOKEN:
|
||||
resp, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), req1, nil)
|
||||
if err != nil {
|
||||
|
@ -108,6 +111,7 @@ func (s *Sprawl) initPeerings() error {
|
|||
}
|
||||
|
||||
func (s *Sprawl) waitForPeeringEstablishment() error {
|
||||
s.awaitMeshGateways()
|
||||
var (
|
||||
logger = s.logger.Named("peering")
|
||||
)
|
||||
|
@ -181,3 +185,64 @@ func (s *Sprawl) checkPeeringDirection(logger hclog.Logger, client *api.Client,
|
|||
}
|
||||
logger.Debug("peering is active", "dur", time.Since(startTime).Round(time.Second))
|
||||
}
|
||||
|
||||
func (s *Sprawl) awaitMeshGateways() {
|
||||
startTime := time.Now()
|
||||
s.logger.Info("awaiting mesh gateways")
|
||||
// TODO: maybe a better way to do this
|
||||
mgws := []*topology.Service{}
|
||||
for _, clu := range s.topology.Clusters {
|
||||
for _, node := range clu.Nodes {
|
||||
for _, svc := range node.Services {
|
||||
if svc.IsMeshGateway {
|
||||
mgws = append(mgws, svc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: parallel
|
||||
for _, mgw := range mgws {
|
||||
cl := s.clients[mgw.Node.Cluster]
|
||||
logger := s.logger.With("cluster", mgw.Node.Cluster, "sid", mgw.ID, "nid", mgw.Node.ID())
|
||||
logger.Info("awaiting MGW readiness")
|
||||
RETRY:
|
||||
// TODO: not sure if there's a better way to check if the MGW is ready
|
||||
svcs, _, err := cl.Catalog().Service(mgw.ID.Name, "", &api.QueryOptions{
|
||||
Namespace: mgw.ID.Namespace,
|
||||
Partition: mgw.ID.Partition,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Debug("fetching MGW service", "err", err)
|
||||
time.Sleep(time.Second)
|
||||
goto RETRY
|
||||
}
|
||||
if len(svcs) < 1 {
|
||||
logger.Debug("no MGW service in catalog yet")
|
||||
time.Sleep(time.Second)
|
||||
goto RETRY
|
||||
}
|
||||
if len(svcs) > 1 {
|
||||
// not sure when this would happen
|
||||
log.Fatalf("expected 1 MGW service, actually: %#v", svcs)
|
||||
}
|
||||
|
||||
entries, _, err := cl.Health().Service(mgw.ID.Name, "", true, &api.QueryOptions{
|
||||
Namespace: mgw.ID.Namespace,
|
||||
Partition: mgw.ID.Partition,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Debug("fetching MGW checks", "err", err)
|
||||
time.Sleep(time.Second)
|
||||
goto RETRY
|
||||
}
|
||||
if len(entries) != 1 {
|
||||
logger.Debug("expected 1 MGW entry", "entries", entries)
|
||||
time.Sleep(time.Second)
|
||||
goto RETRY
|
||||
}
|
||||
|
||||
logger.Debug("MGW ready", "entry", *(entries[0]), "dur", time.Since(startTime).Round(time.Second))
|
||||
}
|
||||
s.logger.Info("mesh gateways ready", "dur", time.Since(startTime).Round(time.Second))
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ import (
|
|||
// Sprawl is the definition of a complete running Consul deployment topology.
|
||||
type Sprawl struct {
|
||||
logger hclog.Logger
|
||||
// set after initial Launch is complete
|
||||
launchLogger hclog.Logger
|
||||
runner *runner.Runner
|
||||
license string
|
||||
secrets secrets.Store
|
||||
|
@ -212,11 +214,20 @@ func Launch(
|
|||
return nil, fmt.Errorf("error gathering diagnostic details: %w", err)
|
||||
}
|
||||
|
||||
s.launchLogger = s.logger
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Sprawl) Relaunch(
|
||||
cfg *topology.Config,
|
||||
) error {
|
||||
return s.RelaunchWithPhase(cfg, "")
|
||||
}
|
||||
|
||||
func (s *Sprawl) RelaunchWithPhase(
|
||||
cfg *topology.Config,
|
||||
phase string,
|
||||
) error {
|
||||
// Copy this BEFORE compiling so we capture the original definition, without denorms.
|
||||
var err error
|
||||
|
@ -225,6 +236,10 @@ func (s *Sprawl) Relaunch(
|
|||
return err
|
||||
}
|
||||
|
||||
if phase != "" {
|
||||
s.logger = s.launchLogger.Named(phase)
|
||||
}
|
||||
|
||||
newTopology, err := topology.Recompile(s.logger.Named("recompile"), cfg, s.topology)
|
||||
if err != nil {
|
||||
return fmt.Errorf("topology.Compile: %w", err)
|
||||
|
|
Loading…
Reference in New Issue