From 65592d91a869f1424b0e9f45245cd745c0d026f4 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 3 Nov 2023 11:43:43 -0500 Subject: [PATCH] chore: apply enterprise changes that were missed to some testing files (#19504) This should align between CE ef35525 and ENT 7f95226dbe40151c8f17dd4464784b60cf358dc1 in: - testing/integration/consul-container - test-integ - testing/deployer --- .../peering_commontopo/ac1_basic_test.go | 2 - .../ac3_service_defaults_upstream_test.go | 1 - .../ac4_proxy_defaults_test.go | 1 - .../ac5_2_pq_failover_test.go | 6 +- .../peering_commontopo/ac6_failovers_test.go | 838 +++++++++--------- .../ac7_1_rotate_gw_test.go | 9 +- .../ac7_2_rotate_leader_test.go | 9 +- test-integ/peering_commontopo/commontopo.go | 184 ++-- test-integ/topoutil/asserter.go | 18 - test-integ/topoutil/fixtures.go | 15 +- .../consul-container/libs/cluster/cluster.go | 4 +- .../test/ratelimit/ratelimit_test.go | 103 ++- .../deployer/sprawl/internal/build/docker.go | 24 +- testing/deployer/sprawl/peering.go | 65 ++ testing/deployer/sprawl/sprawl.go | 23 +- 15 files changed, 793 insertions(+), 509 deletions(-) diff --git a/test-integ/peering_commontopo/ac1_basic_test.go b/test-integ/peering_commontopo/ac1_basic_test.go index 85aaee4e6b..ae729732e7 100644 --- a/test-integ/peering_commontopo/ac1_basic_test.go +++ b/test-integ/peering_commontopo/ac1_basic_test.go @@ -232,8 +232,6 @@ func (s *ac1BasicSuite) test(t *testing.T, ct *commonTopo) { // probably not worth the speed boost ct.Assert.HealthyWithPeer(t, dc.Name, svcServerHTTP.ID, LocalPeerName(peer, "default")) ct.Assert.HealthyWithPeer(t, dc.Name, svcServerTCP.ID, LocalPeerName(peer, "default")) - ct.Assert.UpstreamEndpointHealthy(t, svcClientTCP, ac.upstreamTCP) - ct.Assert.UpstreamEndpointHealthy(t, svcClientTCP, ac.upstreamHTTP) tcs := []struct { acSub int diff --git a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go index b8c7b83f2a..770e0ff90c 100644 --- a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go +++ b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go @@ -172,7 +172,6 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) { // these could be done parallel with each other, but complexity // probably not worth the speed boost ct.Assert.HealthyWithPeer(t, dc.Name, svcServer.ID, LocalPeerName(peer, "default")) - ct.Assert.UpstreamEndpointHealthy(t, svcClient, s.upstream) // TODO: we need to let the upstream start serving properly before we do this. if it // isn't ready and returns a 5xx (which it will do if it's not up yet!), it will stick // in a down state for PassiveHealthCheck.Interval diff --git a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go index a19782bbab..28df1c3d4c 100644 --- a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go +++ b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go @@ -159,7 +159,6 @@ func (s *ac4ProxyDefaultsSuite) test(t *testing.T, ct *commonTopo) { // preconditions check ct.Assert.HealthyWithPeer(t, dc.Name, serverSVC.ID, LocalPeerName(peer, "default")) - ct.Assert.UpstreamEndpointHealthy(t, clientSVC, s.upstream) ct.Assert.FortioFetch2HeaderEcho(t, clientSVC, s.upstream) t.Run("Validate services exist in catalog", func(t *testing.T) { diff --git a/test-integ/peering_commontopo/ac5_2_pq_failover_test.go b/test-integ/peering_commontopo/ac5_2_pq_failover_test.go index 3bf8c9be9a..dec0de3ea1 100644 --- a/test-integ/peering_commontopo/ac5_2_pq_failover_test.go +++ b/test-integ/peering_commontopo/ac5_2_pq_failover_test.go @@ -30,11 +30,15 @@ type ac5_2PQFailoverSuite struct { serverSID topology.ServiceID nodeServer topology.NodeID } +type nodeKey struct { + dc string + partition string +} var ac5_2Context = make(map[nodeKey]ac5_2PQFailoverSuite) func TestAC5PreparedQueryFailover(t *testing.T) { - ct := NewCommonTopo(t) + ct := newCommonTopo(t, "dc2", true, true) s := &ac5_2PQFailoverSuite{} s.setup(t, ct) ct.Launch(t) diff --git a/test-integ/peering_commontopo/ac6_failovers_test.go b/test-integ/peering_commontopo/ac6_failovers_test.go index fe3cd181b2..25e734200f 100644 --- a/test-integ/peering_commontopo/ac6_failovers_test.go +++ b/test-integ/peering_commontopo/ac6_failovers_test.go @@ -14,419 +14,453 @@ import ( "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) -// note: unlike other *Suite structs that are per-peering direction, -// this one is special and does all directions itself, because the -// setup is not exactly symmetrical type ac6FailoversSuite struct { - ac6 map[nodeKey]ac6FailoversContext -} -type ac6FailoversContext struct { - clientSID topology.ServiceID - serverSID topology.ServiceID + // inputs + // with all false, this gives us a scenario with: + // - a "near" server in the accepter cluster (DC1), partitition default, namespace default + // - a "far" server in the dialer cluster (DC2), partition default, namespace default + // - a client in the accepter cluster (DC1), partition default, namespace default, with: + // - upstream near server (DC1) + // - failover to far server (DC2) + // + // TODO: technically if NearInDial && !FarInAcc (i.e., near == far), then we're not doing peering at all, + // and could do this test in a single DC + // when true, put the client (and its default upstream server) in the dialer peer; otherwise, put client in accepter + NearInDial bool + // when true, put the client (and its default upstream server) in the nondefault partition/namespace; otherwise in the default + NearInPartAlt bool + NearInNSAlt bool + // when true, put far server to the accepter peer; otherwise the dialer + FarInAcc bool + // when true, put far server to nondefault partition/namespace (ENT-only); otherwise, failover to default + FarInPartAlt bool + FarInNSAlt bool + + // launch outputs, for querying during test + clientSID topology.ServiceID + // near = same DC as client; far = other DC + nearServerSID topology.ServiceID // used to remove the node and trigger failover - serverNode topology.NodeID -} -type nodeKey struct { - dc string - partition string + nearServerNode topology.NodeID + farServerSID topology.ServiceID + farServerNode topology.NodeID } // Note: this test cannot share topo func TestAC6Failovers(t *testing.T) { - ct := NewCommonTopo(t) - s := &ac6FailoversSuite{} - s.setup(t, ct) - ct.Launch(t) - s.test(t, ct) -} - -func (s *ac6FailoversSuite) setup(t *testing.T, ct *commonTopo) { - // TODO: update setups to loop through a cluster's partitions+namespaces internally - s.setupAC6Failovers(ct, ct.DC1, ct.DC2) - s.setupAC6Failovers(ct, ct.DC2, ct.DC1) - s.setupAC6FailoversDC3(ct, ct.DC3, ct.DC1, ct.DC2) -} - -// dc1 is peered with dc2 and dc3. -// dc1 has an ac6-client in "default" and "part1" partitions (only default in CE). -// ac6-client has a single upstream ac6-failover-svc in its respective partition^. -// -// ac6-failover-svc has the following failovers: -// - peer-dc2-default -// - peer-dc2-part1 (not in CE) -// - peer-dc3-default -// -// This setup is mirrored from dc2->dc1 as well -// (both dcs have dc3 as the last failover target) -// -// ^NOTE: There are no cross-partition upstreams because MeshGatewayMode = local -// and failover information gets stripped out by the mesh gateways so we -// can't test failovers. -func (s *ac6FailoversSuite) setupAC6Failovers(ct *commonTopo, clu, peerClu *topology.Cluster) { - for _, part := range clu.Partitions { - partition := part.Name - - // There is a peering per partition in the peered cluster - var peers []string - for _, peerPart := range peerClu.Partitions { - peers = append(peers, LocalPeerName(peerClu, peerPart.Name)) + // bit banging to get all permutations of all params + const nParams = 3 + // i.e 2**nParams + const n = int(1) << nParams + for i := 0; i < n; i++ { + s := ac6FailoversSuite{ + // xth bit == 1 + NearInDial: (i>>0)&1 == 1, + NearInPartAlt: (i>>1)&1 == 1, + FarInPartAlt: (i>>2)&1 == 1, } - - // Make an HTTP server with various failover targets - serverSID := topology.ServiceID{ - Name: "ac6-failover-svc", - Partition: partition, - } - server := NewFortioServiceWithDefaults( - clu.Datacenter, - serverSID, - nil, - ) - // Export to all known peers - ct.ExportService(clu, partition, - api.ExportedService{ - Name: server.ID.Name, - Consumers: func() []api.ServiceConsumer { - var consumers []api.ServiceConsumer - for _, peer := range peers { - consumers = append(consumers, api.ServiceConsumer{ - Peer: peer, - }) - } - return consumers - }(), - }, - ) - serverNode := ct.AddServiceNode(clu, serviceExt{Service: server}) - - clu.InitialConfigEntries = append(clu.InitialConfigEntries, - &api.ServiceConfigEntry{ - Kind: api.ServiceDefaults, - Name: server.ID.Name, - Partition: ConfigEntryPartition(partition), - Protocol: "http", - }, - &api.ServiceResolverConfigEntry{ - Kind: api.ServiceResolver, - Name: server.ID.Name, - Partition: ConfigEntryPartition(partition), - Failover: map[string]api.ServiceResolverFailover{ - "*": { - Targets: func() []api.ServiceResolverFailoverTarget { - // Make a failover target for every partition in the peer cluster - var targets []api.ServiceResolverFailoverTarget - for _, peer := range peers { - targets = append(targets, api.ServiceResolverFailoverTarget{ - Peer: peer, - }) - } - // Just hard code default partition for dc3, since the exhaustive - // testing will be done against dc2. - targets = append(targets, api.ServiceResolverFailoverTarget{ - Peer: "peer-dc3-default", - }) - return targets - }(), - }, - }, - }, - ) - - // Make client which will dial server - clientSID := topology.ServiceID{ - Name: "ac6-client", - Partition: partition, - } - client := NewFortioServiceWithDefaults( - clu.Datacenter, - clientSID, - func(s *topology.Service) { - // Upstream per partition - s.Upstreams = []*topology.Upstream{ - { - ID: topology.ServiceID{ - Name: server.ID.Name, - Partition: part.Name, - }, - LocalPort: 5000, - // exposed so we can hit it directly - // TODO: we shouldn't do this; it's not realistic - LocalAddress: "0.0.0.0", - }, - } - }, - ) - ct.ExportService(clu, partition, - api.ExportedService{ - Name: client.ID.Name, - Consumers: func() []api.ServiceConsumer { - var consumers []api.ServiceConsumer - // Export to each peer - for _, peer := range peers { - consumers = append(consumers, api.ServiceConsumer{ - Peer: peer, - }) - } - return consumers - }(), - }, - ) - ct.AddServiceNode(clu, serviceExt{Service: client}) - - clu.InitialConfigEntries = append(clu.InitialConfigEntries, - &api.ServiceConfigEntry{ - Kind: api.ServiceDefaults, - Name: client.ID.Name, - Partition: ConfigEntryPartition(partition), - Protocol: "http", - }, - ) - - // Add intention allowing local and peered clients to call server - clu.InitialConfigEntries = append(clu.InitialConfigEntries, - &api.ServiceIntentionsConfigEntry{ - Kind: api.ServiceIntentions, - Name: server.ID.Name, - Partition: ConfigEntryPartition(partition), - // SourceIntention for local client and peered clients - Sources: func() []*api.SourceIntention { - ixns := []*api.SourceIntention{ - { - Name: client.ID.Name, - Partition: ConfigEntryPartition(part.Name), - Action: api.IntentionActionAllow, - }, - } - for _, peer := range peers { - ixns = append(ixns, &api.SourceIntention{ - Name: client.ID.Name, - Peer: peer, - Action: api.IntentionActionAllow, - }) - } - return ixns - }(), - }, - ) - if s.ac6 == nil { - s.ac6 = map[nodeKey]ac6FailoversContext{} - } - s.ac6[nodeKey{clu.Datacenter, partition}] = struct { - clientSID topology.ServiceID - serverSID topology.ServiceID - serverNode topology.NodeID - }{ - clientSID: clientSID, - serverSID: serverSID, - serverNode: serverNode.ID(), - } - } -} - -func (s *ac6FailoversSuite) setupAC6FailoversDC3(ct *commonTopo, clu, peer1, peer2 *topology.Cluster) { - var peers []string - for _, part := range peer1.Partitions { - peers = append(peers, LocalPeerName(peer1, part.Name)) - } - for _, part := range peer2.Partitions { - peers = append(peers, LocalPeerName(peer2, part.Name)) - } - - partition := "default" - - // Make an HTTP server - server := NewFortioServiceWithDefaults( - clu.Datacenter, - topology.ServiceID{ - Name: "ac6-failover-svc", - Partition: partition, - }, - nil, - ) - - ct.AddServiceNode(clu, serviceExt{ - Service: server, - Config: &api.ServiceConfigEntry{ - Kind: api.ServiceDefaults, - Name: server.ID.Name, - Partition: ConfigEntryPartition(partition), - Protocol: "http", - }, - Intentions: &api.ServiceIntentionsConfigEntry{ - Kind: api.ServiceIntentions, - Name: server.ID.Name, - Partition: ConfigEntryPartition(partition), - Sources: func() []*api.SourceIntention { - var ixns []*api.SourceIntention - for _, peer := range peers { - ixns = append(ixns, &api.SourceIntention{ - Name: "ac6-client", - Peer: peer, - Action: api.IntentionActionAllow, - }) - } - return ixns - }(), - }, - Exports: func() []api.ServiceConsumer { - var consumers []api.ServiceConsumer - for _, peer := range peers { - consumers = append(consumers, api.ServiceConsumer{ - Peer: peer, - }) - } - return consumers - }(), - }) -} - -func (s *ac6FailoversSuite) test(t *testing.T, ct *commonTopo) { - dc1 := ct.Sprawl.Topology().Clusters["dc1"] - dc2 := ct.Sprawl.Topology().Clusters["dc2"] - - type testcase struct { - name string - cluster *topology.Cluster - peer *topology.Cluster - partition string - } - tcs := []testcase{ - { - name: "dc1 default partition failovers", - cluster: dc1, - peer: dc2, // dc3 is hardcoded - partition: "default", - }, - { - name: "dc1 part1 partition failovers", - cluster: dc1, - peer: dc2, // dc3 is hardcoded - partition: "part1", - }, - { - name: "dc2 default partition failovers", - cluster: dc2, - peer: dc1, // dc3 is hardcoded - partition: "default", - }, - { - name: "dc2 part1 partition failovers", - cluster: dc2, - peer: dc1, // dc3 is hardcoded - partition: "part1", - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - // NOTE: *not parallel* because we mutate resources that are shared - // between test cases (disable/enable nodes) - if !utils.IsEnterprise() && tc.partition != "default" { - t.Skip("skipping enterprise test") - } - partition := tc.partition - clu := tc.cluster - peerClu := tc.peer - - svcs := clu.ServicesByID(s.ac6[nodeKey{clu.Datacenter, partition}].clientSID) - require.Len(t, svcs, 1, "expected exactly one client in datacenter") - - serverSID := s.ac6[nodeKey{clu.Datacenter, partition}].serverSID - serverSID.Normalize() - - client := svcs[0] - require.Len(t, client.Upstreams, 1, "expected one upstream for client") - - u := client.Upstreams[0] - ct.Assert.CatalogServiceExists(t, clu.Name, u.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{ - Partition: u.ID.Partition, - })) - - t.Cleanup(func() { - cfg := ct.Sprawl.Config() - for _, part := range clu.Partitions { - EnableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, part.Name}].serverNode) - } - for _, part := range peerClu.Partitions { - EnableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, part.Name}].serverNode) - } - require.NoError(t, ct.Sprawl.Relaunch(cfg)) - }) - - fmt.Println("### preconditions") - // TODO: deduce this number, instead of hard-coding - nFailoverTargets := 4 - // in CE, we don't have failover targets for non-default partitions - if !utils.IsEnterprise() { - nFailoverTargets = 3 - } - for i := 0; i < nFailoverTargets; i++ { - ct.Assert.UpstreamEndpointStatus(t, client, fmt.Sprintf("failover-target~%d~%s", i, clusterPrefix(u, clu.Datacenter)), "HEALTHY", 1) - } - - ct.Assert.FortioFetch2FortioName(t, client, u, clu.Name, serverSID) - - if t.Failed() { - t.Fatalf("failed preconditions") - } - - fmt.Println("### Failover to peer target") - cfg := ct.Sprawl.Config() - DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode) - require.NoError(t, ct.Sprawl.Relaunch(cfg)) - // Clusters for imported services rely on outlier detection for - // failovers, NOT eds_health_status. This means that killing the - // node above does not actually make the envoy cluster UNHEALTHY - // so we do not assert for it. - expectUID := topology.ServiceID{ - Name: u.ID.Name, - Partition: "default", - } - expectUID.Normalize() - ct.Assert.FortioFetch2FortioName(t, client, u, peerClu.Name, expectUID) - - if utils.IsEnterprise() { - fmt.Println("### Failover to peer target in non-default partition") - cfg = ct.Sprawl.Config() - DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode) - DisableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, "default"}].serverNode) - require.NoError(t, ct.Sprawl.Relaunch(cfg)) - // Retry until outlier_detection deems the cluster - // unhealthy and fails over to peer part1. - expectUID = topology.ServiceID{ - Name: u.ID.Name, - Partition: "part1", - } - expectUID.Normalize() - ct.Assert.FortioFetch2FortioName(t, client, u, peerClu.Name, expectUID) - } - - fmt.Println("### Failover to dc3 peer target") - cfg = ct.Sprawl.Config() - DisableNode(t, cfg, clu.Name, s.ac6[nodeKey{clu.Datacenter, partition}].serverNode) - // Disable all partitions for peer - for _, part := range peerClu.Partitions { - DisableNode(t, cfg, peerClu.Name, s.ac6[nodeKey{peerClu.Datacenter, part.Name}].serverNode) - } - require.NoError(t, ct.Sprawl.Relaunch(cfg)) - // This will retry until outlier_detection deems the cluster - // unhealthy and fails over to dc3. - expectUID = topology.ServiceID{ - Name: u.ID.Name, - Partition: "default", - } - expectUID.Normalize() - ct.Assert.FortioFetch2FortioName(t, client, u, "dc3", expectUID) + // ensure the servers are always in separate DCs + s.FarInAcc = s.NearInDial + t.Run(fmt.Sprintf("%02d_%s", i, s.testName()), func(t *testing.T) { + t.Parallel() + ct := NewCommonTopo(t) + s.setup(t, ct) + ct.Launch(t) + s.test(t, ct) }) } } -func clusterPrefix(u *topology.Upstream, dc string) string { - u.ID.Normalize() - switch u.ID.Partition { - case "default": - return fmt.Sprintf("%s.%s.%s.internal", u.ID.Name, u.ID.Namespace, dc) - default: - return fmt.Sprintf("%s.%s.%s.%s.internal-v1", u.ID.Name, u.ID.Namespace, u.ID.Partition, dc) +func TestNET5029Failovers(t *testing.T) { + // TODO: *.{a,b} are not actually peering tests, and should technically be moved elsewhere + suites := map[string]ac6FailoversSuite{ + "1.a": { + FarInAcc: true, + FarInPartAlt: true, + }, + "1.b": { + FarInAcc: true, + FarInNSAlt: true, + }, + "1.c": { + FarInNSAlt: true, + }, + "1.d": { + FarInPartAlt: true, + }, + "2.a": { + FarInAcc: true, + NearInPartAlt: true, + }, + "2.b": { + FarInAcc: true, + NearInNSAlt: true, + }, + "2.c": { + NearInDial: true, + NearInNSAlt: true, + FarInAcc: true, + }, + "2.d": { + NearInDial: true, + NearInPartAlt: true, + FarInAcc: true, + }, + } + for name, s := range suites { + s := s + t.Run(fmt.Sprintf("%s_%s", name, s.testName()), func(t *testing.T) { + if name == "1.b" { + t.Skip("TODO: fails with 503/504") + } + t.Parallel() + ct := NewCommonTopo(t) + s.setup(t, ct) + ct.Launch(t) + s.test(t, ct) + }) } } + +func TestAC6Failovers_AllPermutations(t *testing.T) { + // + t.Skip("Too many permutations") + // bit banging to get all permutations of all params + const nParams = 6 + // i.e 2**nParams + const n = int(1) << nParams + for i := 0; i < n; i++ { + s := ac6FailoversSuite{ + // xth bit == 1 + NearInDial: (i>>0)&1 == 1, + FarInAcc: (i>>1)&1 == 1, + NearInPartAlt: (i>>2)&1 == 1, + FarInPartAlt: (i>>3)&1 == 1, + NearInNSAlt: (i>>4)&1 == 1, + FarInNSAlt: (i>>5)&1 == 1, + } + t.Run(fmt.Sprintf("%02d_%s", i, s.testName()), func(t *testing.T) { + t.Parallel() + ct := NewCommonTopo(t) + s.setup(t, ct) + ct.Launch(t) + s.test(t, ct) + }) + } +} + +func (s *ac6FailoversSuite) testName() (ret string) { + switch s.NearInDial { + case true: + ret += "dial" + default: + ret += "acc" + } + ret += "." + switch s.NearInPartAlt { + case true: + ret += "alt" + default: + ret += "default" + } + ret += "." + switch s.NearInNSAlt { + case true: + ret += "alt" + default: + ret += "default" + } + + ret += "->" + + switch s.FarInAcc { + case true: + ret += "acc" + default: + ret += "dial" + } + ret += "." + switch s.FarInPartAlt { + case true: + ret += "alt" + default: + ret += "default" + } + ret += "." + switch s.FarInNSAlt { + case true: + ret += "alt" + default: + ret += "default" + } + + return +} + +func (s *ac6FailoversSuite) setup(t *testing.T, ct *commonTopo) { + if !utils.IsEnterprise() && (s.NearInPartAlt || s.FarInPartAlt) { + t.Skip("ENT required for nondefault partitions") + } + + nearClu := ct.DC1 + farClu := ct.DC2 + if s.NearInDial { + nearClu = ct.DC2 + } + if s.FarInAcc { + farClu = ct.DC1 + } + + // - server in clientPartition/DC (main target) + nearServerSID := topology.ServiceID{ + Name: "ac6-server", + Partition: ConfigEntryPartition("default"), + Namespace: "default", + } + if s.NearInPartAlt { + nearServerSID.Partition = "part1" + } + if s.NearInNSAlt { + nearServerSID.Namespace = "ns1" + } + nearServer := NewFortioServiceWithDefaults( + nearClu.Datacenter, + nearServerSID, + nil, + ) + nearServerNode := ct.AddServiceNode(nearClu, serviceExt{Service: nearServer}) + + nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries, + &api.ServiceConfigEntry{ + Kind: api.ServiceDefaults, + Name: nearServerSID.Name, + Partition: ConfigEntryPartition(nearServerSID.Partition), + Namespace: nearServerSID.Namespace, + Protocol: "http", + }, + ) + // - server in otherPartition/otherDC + farServerSID := topology.ServiceID{ + Name: nearServerSID.Name, + Partition: "default", + Namespace: "default", + } + if s.FarInPartAlt { + farServerSID.Partition = "part1" + } + if s.FarInNSAlt { + farServerSID.Namespace = "ns1" + } + farServer := NewFortioServiceWithDefaults( + farClu.Datacenter, + farServerSID, + nil, + ) + farServerNode := ct.AddServiceNode(farClu, serviceExt{Service: farServer}) + if nearClu != farClu { + ct.ExportService(farClu, farServerSID.Partition, + api.ExportedService{ + Name: farServerSID.Name, + Namespace: farServerSID.Namespace, + Consumers: []api.ServiceConsumer{ + { + Peer: LocalPeerName(nearClu, nearServerSID.Partition), + }, + }, + }, + ) + } else if nearClu == farClu && farServerSID.Partition != nearServerSID.Partition { + ct.ExportService(farClu, farServerSID.Partition, + api.ExportedService{ + Name: farServerSID.Name, + Namespace: farServerSID.Namespace, + Consumers: []api.ServiceConsumer{ + { + // this must not be "", or else it is basically ignored altogether + // TODO: bug? if this whole struct is empty, that should be an error + Partition: topology.PartitionOrDefault(nearServerSID.Partition), + }, + }, + }, + ) + } + + var targets []api.ServiceResolverFailoverTarget + if nearClu != farClu { + targets = []api.ServiceResolverFailoverTarget{ + { + Service: farServerSID.Name, + Peer: LocalPeerName(farClu, farServerSID.Partition), + Namespace: farServerSID.Namespace, + }, + } + } else { + part := ConfigEntryPartition(farServerSID.Partition) + // weird exception here where target partition set to "" means "inherit from parent" + // TODO: bug? docs say "" -> default: + // https://developer.hashicorp.com/consul/docs/connect/config-entries/service-resolver#failover-targets-partition + if farServerSID.Partition == "default" && nearServerSID.Partition != "default" { + part = "default" + } + targets = []api.ServiceResolverFailoverTarget{ + { + Service: farServerSID.Name, + Partition: part, + Namespace: farServerSID.Namespace, + }, + } + } + + nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries, + &api.ServiceConfigEntry{ + Kind: api.ServiceDefaults, + Name: farServerSID.Name, + Partition: ConfigEntryPartition(farServerSID.Partition), + Namespace: farServerSID.Namespace, + Protocol: "http", + }, + &api.ServiceResolverConfigEntry{ + Kind: api.ServiceResolver, + Name: nearServerSID.Name, + Partition: ConfigEntryPartition(nearServerSID.Partition), + Namespace: nearServerSID.Namespace, + Failover: map[string]api.ServiceResolverFailover{ + "*": { + Targets: targets, + }, + }, + }, + ) + + clientSID := topology.ServiceID{ + Name: "ac6-client", + Partition: nearServerSID.Partition, + Namespace: nearServerSID.Namespace, + } + client := NewFortioServiceWithDefaults( + nearClu.Datacenter, + clientSID, + func(s *topology.Service) { + // Upstream per partition + s.Upstreams = []*topology.Upstream{ + { + ID: topology.ServiceID{ + Name: nearServerSID.Name, + Partition: nearServerSID.Partition, + Namespace: nearServerSID.Namespace, + }, + LocalPort: 5000, + // exposed so we can hit it directly + // TODO: we shouldn't do this; it's not realistic + LocalAddress: "0.0.0.0", + }, + } + }, + ) + ct.AddServiceNode(nearClu, serviceExt{Service: client}) + nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries, + &api.ServiceConfigEntry{ + Kind: api.ServiceDefaults, + Name: clientSID.Name, + Partition: ConfigEntryPartition(clientSID.Partition), + Namespace: clientSID.Namespace, + Protocol: "http", + }, + ) + + // intentions + nearClu.InitialConfigEntries = append(nearClu.InitialConfigEntries, + &api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: nearServerSID.Name, + Partition: ConfigEntryPartition(nearServerSID.Partition), + Namespace: nearServerSID.Namespace, + Sources: []*api.SourceIntention{{ + Name: clientSID.Name, + Namespace: clientSID.Namespace, + // in this field, "" -> destination partition, so no ConfigEntryPartition :eyeroll: + // https://developer.hashicorp.com/consul/docs/connect/config-entries/service-intentions#sources-partition + Partition: topology.PartitionOrDefault(clientSID.Partition), + Action: api.IntentionActionAllow, + }}, + }, + ) + farSource := api.SourceIntention{ + Name: clientSID.Name, + Namespace: clientSID.Namespace, + Peer: LocalPeerName(nearClu, clientSID.Partition), + Action: api.IntentionActionAllow, + } + if nearClu == farClu { + farSource.Peer = "" + // in this field, "" -> destination partition, so no ConfigEntryPartition :eyeroll: + // https://developer.hashicorp.com/consul/docs/connect/config-entries/service-intentions#sources-partition + farSource.Partition = topology.PartitionOrDefault(clientSID.Partition) + } + farClu.InitialConfigEntries = append(farClu.InitialConfigEntries, + &api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: farServerSID.Name, + Partition: ConfigEntryPartition(farServerSID.Partition), + Namespace: farServerSID.Namespace, + Sources: []*api.SourceIntention{&farSource}, + }, + ) + + s.clientSID = clientSID + s.nearServerSID = nearServerSID + s.farServerSID = farServerSID + s.nearServerNode = nearServerNode.ID() + s.farServerNode = farServerNode.ID() +} + +func (s *ac6FailoversSuite) test(t *testing.T, ct *commonTopo) { + // NOTE: *not parallel* because we mutate resources that are shared + // between test cases (disable/enable nodes) + + nearClu := ct.Sprawl.Topology().Clusters["dc1"] + farClu := ct.Sprawl.Topology().Clusters["dc2"] + if s.NearInDial { + nearClu = ct.Sprawl.Topology().Clusters["dc2"] + } + if s.FarInAcc { + farClu = ct.Sprawl.Topology().Clusters["dc1"] + } + + svcs := nearClu.ServicesByID(s.clientSID) + require.Len(t, svcs, 1, "expected exactly one client in datacenter") + + client := svcs[0] + require.Len(t, client.Upstreams, 1, "expected one upstream for client") + upstream := client.Upstreams[0] + + fmt.Println("### preconditions") + + // this is the server in the same DC and partitions as client + serverSID := s.nearServerSID + serverSID.Normalize() + ct.Assert.FortioFetch2FortioName(t, client, upstream, nearClu.Name, serverSID) + + ct.Assert.CatalogServiceExists(t, nearClu.Name, upstream.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{ + Partition: upstream.ID.Partition, + Namespace: upstream.ID.Namespace, + })) + + if t.Failed() { + t.Fatal("failed preconditions") + } + + fmt.Println("### failover") + + cfg := ct.Sprawl.Config() + DisableNode(t, cfg, nearClu.Name, s.nearServerNode) + require.NoError(t, ct.Sprawl.RelaunchWithPhase(cfg, "failover")) + // Clusters for imported services rely on outlier detection for + // failovers, NOT eds_health_status. This means that killing the + // node above does not actually make the envoy cluster UNHEALTHY + // so we do not assert for it. + expectSID := s.farServerSID + expectSID.Normalize() + ct.Assert.FortioFetch2FortioName(t, client, upstream, farClu.Name, expectSID) +} diff --git a/test-integ/peering_commontopo/ac7_1_rotate_gw_test.go b/test-integ/peering_commontopo/ac7_1_rotate_gw_test.go index 89f1ed04f2..a492b95dc0 100644 --- a/test-integ/peering_commontopo/ac7_1_rotate_gw_test.go +++ b/test-integ/peering_commontopo/ac7_1_rotate_gw_test.go @@ -11,8 +11,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testing/deployer/topology" "github.com/stretchr/testify/require" - - "github.com/hashicorp/consul/test-integ/topoutil" ) // TestRotateGW ensures that peered services continue to be able to talk to their @@ -143,10 +141,10 @@ func (s *suiteRotateGW) setup(t *testing.T, ct *commonTopo) { // add a second mesh gateway "new" s.newMGWNodeName = fmt.Sprintf("new-%s-default-mgw", clu.Name) nodeKind := topology.NodeKindClient - if clu.Datacenter == agentlessDC { + if clu.Datacenter == ct.agentlessDC { nodeKind = topology.NodeKindDataplane } - clu.Nodes = append(clu.Nodes, topoutil.NewTopologyMeshGatewaySet( + _, mgwNodes := newTopologyMeshGatewaySet( nodeKind, "default", s.newMGWNodeName, @@ -155,7 +153,8 @@ func (s *suiteRotateGW) setup(t *testing.T, ct *commonTopo) { func(i int, node *topology.Node) { node.Disabled = true }, - )...) + ) + clu.Nodes = append(clu.Nodes, mgwNodes...) } func (s *suiteRotateGW) test(t *testing.T, ct *commonTopo) { diff --git a/test-integ/peering_commontopo/ac7_2_rotate_leader_test.go b/test-integ/peering_commontopo/ac7_2_rotate_leader_test.go index a5684ebbc0..36e3c0ced3 100644 --- a/test-integ/peering_commontopo/ac7_2_rotate_leader_test.go +++ b/test-integ/peering_commontopo/ac7_2_rotate_leader_test.go @@ -8,12 +8,13 @@ import ( "testing" "time" - "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" - "github.com/hashicorp/consul/testing/deployer/topology" "github.com/mitchellh/copystructure" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/topology" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" ) @@ -165,7 +166,7 @@ func (s *ac7_2RotateLeaderSuite) test(t *testing.T, ct *commonTopo) { found := 0 foundI := 0 for i, svc := range ceAsES.Services { - if svc.Name == s.sidServer.Name && svc.Namespace == utils.DefaultToEmpty(s.sidServer.Namespace) { + if svc.Name == s.sidServer.Name && utils.DefaultToEmpty(svc.Namespace) == utils.DefaultToEmpty(s.sidServer.Namespace) { found += 1 foundI = i } @@ -176,7 +177,7 @@ func (s *ac7_2RotateLeaderSuite) test(t *testing.T, ct *commonTopo) { _, _, err = clPeer.ConfigEntries().Set(ceAsES, nil) require.NoError(t, err) t.Cleanup(func() { - //restore for next pairing + // restore for next pairing _, _, err = clPeer.ConfigEntries().Set(origCE.(*api.ExportedServicesConfigEntry), nil) require.NoError(t, err) }) diff --git a/test-integ/peering_commontopo/commontopo.go b/test-integ/peering_commontopo/commontopo.go index 94ada34347..be2ae79cd6 100644 --- a/test-integ/peering_commontopo/commontopo.go +++ b/test-integ/peering_commontopo/commontopo.go @@ -40,6 +40,7 @@ type commonTopo struct { // shortcuts to corresponding entry in Cfg DC1 *topology.Cluster DC2 *topology.Cluster + // nil if includeDC3 is false DC3 *topology.Cluster // set after Launch. Should be considered read-only @@ -48,66 +49,101 @@ type commonTopo struct { // track per-DC services to prevent duplicates services map[string]map[topology.ServiceID]struct{} + + // if zero, no DCs are agentless + agentlessDC string + + // if true, create DC3 and associated links (currently only used by ac5.2) + includeDC3 bool + + peerThroughMGW bool } -const agentlessDC = "dc2" +func NewCommonTopoWithoutAgentless(t *testing.T) *commonTopo { + t.Helper() + return newCommonTopo(t, "", false, true) +} func NewCommonTopo(t *testing.T) *commonTopo { t.Helper() + return newCommonTopo(t, "dc2", false, true) +} - ct := commonTopo{} +func newCommonTopo(t *testing.T, agentlessDC string, includeDC3 bool, peerThroughMGW bool) *commonTopo { + t.Helper() + + ct := commonTopo{ + agentlessDC: agentlessDC, + includeDC3: includeDC3, + peerThroughMGW: peerThroughMGW, + } const nServers = 3 // Make 3-server clusters in dc1 and dc2 // For simplicity, the Name and Datacenter of the clusters are the same. // dc1 and dc2 should be symmetric. - dc1 := clusterWithJustServers("dc1", nServers) + dc1 := ct.clusterWithJustServers("dc1", nServers) ct.DC1 = dc1 - dc2 := clusterWithJustServers("dc2", nServers) + dc2 := ct.clusterWithJustServers("dc2", nServers) ct.DC2 = dc2 - // dc3 is a failover cluster for both dc1 and dc2 - dc3 := clusterWithJustServers("dc3", 1) - // dc3 is only used for certain failover scenarios and does not need tenancies - dc3.Partitions = []*topology.Partition{{Name: "default"}} - ct.DC3 = dc3 + clusters := []*topology.Cluster{dc1, dc2} + + var dc3 *topology.Cluster + + if ct.includeDC3 { + // dc3 is a failover cluster for both dc1 and dc2 + dc3 = ct.clusterWithJustServers("dc3", 1) + // dc3 is only used for certain failover scenarios and does not need tenancies + dc3.Partitions = []*topology.Partition{{Name: "default"}} + ct.DC3 = dc3 + // dc3 is only used for certain failover scenarios and does not need tenancies + dc3.Partitions = []*topology.Partition{{Name: "default"}} + + clusters = append(clusters, dc3) + } injectTenancies(dc1) injectTenancies(dc2) - // dc3 is only used for certain failover scenarios and does not need tenancies - dc3.Partitions = []*topology.Partition{{Name: "default"}} + // dc3 doesn't get tenancies ct.services = map[string]map[topology.ServiceID]struct{}{} - for _, dc := range []*topology.Cluster{dc1, dc2, dc3} { + for _, dc := range clusters { ct.services[dc.Datacenter] = map[topology.ServiceID]struct{}{} } peerings := addPeerings(dc1, dc2) - peerings = append(peerings, addPeerings(dc1, dc3)...) - peerings = append(peerings, addPeerings(dc2, dc3)...) + if ct.includeDC3 { + peerings = append(peerings, addPeerings(dc1, dc3)...) + peerings = append(peerings, addPeerings(dc2, dc3)...) + } - addMeshGateways(dc1) - addMeshGateways(dc2) - addMeshGateways(dc3) + ct.addMeshGateways(dc1) + ct.addMeshGateways(dc2) + if ct.includeDC3 { + ct.addMeshGateways(dc3) + } - setupGlobals(dc1) - setupGlobals(dc2) - setupGlobals(dc3) + ct.setupGlobals(dc1) + ct.setupGlobals(dc2) + if ct.includeDC3 { + ct.setupGlobals(dc3) + } + + networks := []*topology.Network{ + {Name: "wan", Type: "wan"}, + {Name: dc1.Datacenter}, // "dc1" LAN + {Name: dc2.Datacenter}, // "dc2" LAN + } + if ct.includeDC3 { + networks = append(networks, &topology.Network{Name: dc3.Datacenter}) + } // Build final configuration ct.Cfg = &topology.Config{ - Images: utils.TargetImages(), - Networks: []*topology.Network{ - {Name: dc1.Datacenter}, // "dc1" LAN - {Name: dc2.Datacenter}, // "dc2" LAN - {Name: dc3.Datacenter}, // "dc3" LAN - {Name: "wan", Type: "wan"}, - }, - Clusters: []*topology.Cluster{ - dc1, - dc2, - dc3, - }, + Images: utils.TargetImages(), + Networks: networks, + Clusters: clusters, Peerings: peerings, } return &ct @@ -142,10 +178,12 @@ func (ct *commonTopo) postLaunchChecks(t *testing.T) { for _, e := range clu.InitialConfigEntries { if e.GetKind() == api.ExportedServices { asExport := e.(*api.ExportedServicesConfigEntry) - // do we care about the partition? for _, svc := range asExport.Services { for _, con := range svc.Consumers { - // do we care about con.Partition? + // if Peer is unset, this is an export to another partition in the same DC, so we don't need to check it + if con.Peer == "" { + continue + } // TODO: surely there is code to normalize this partition := asExport.Partition if partition == "" { @@ -183,6 +221,9 @@ func (ct *commonTopo) postLaunchChecks(t *testing.T) { // PeerName is how you'd address a remote dc+partition locally // as your peer name. func LocalPeerName(clu *topology.Cluster, partition string) string { + if partition == "" { + partition = "default" + } return fmt.Sprintf("peer-%s-%s", clu.Datacenter, partition) } @@ -227,7 +268,7 @@ func (ct *commonTopo) AddServiceNode(clu *topology.Cluster, svc serviceExt) *top nodeKind := topology.NodeKindClient // TODO: bug in deployer somewhere; it should guard against a KindDataplane node with // DisableServiceMesh services on it; dataplane is only for service-mesh - if !svc.DisableServiceMesh && clu.Datacenter == agentlessDC { + if !svc.DisableServiceMesh && clu.Datacenter == ct.agentlessDC { nodeKind = topology.NodeKindDataplane } @@ -306,13 +347,9 @@ func (ct *commonTopo) ClusterByDatacenter(t *testing.T, name string) *topology.C return nil } -// Since CE config entries do not contain the partition field, -// this func converts default partition to empty string. +// Deprecated: topoutil.ConfigEntryPartition func ConfigEntryPartition(p string) string { - if p == "default" { - return "" // make this CE friendly - } - return p + return topoutil.ConfigEntryPartition(p) } // DisableNode is a no-op if the node is already disabled. @@ -335,7 +372,7 @@ func EnableNode(t *testing.T, cfg *topology.Config, clusterName string, nid topo return cfg } -func setupGlobals(clu *topology.Cluster) { +func (ct *commonTopo) setupGlobals(clu *topology.Cluster) { for _, part := range clu.Partitions { clu.InitialConfigEntries = append(clu.InitialConfigEntries, &api.ProxyConfigEntry{ @@ -349,35 +386,63 @@ func setupGlobals(clu *topology.Cluster) { Mode: api.MeshGatewayModeLocal, }, }, - &api.MeshConfigEntry{ - Peering: &api.PeeringMeshConfig{ - PeerThroughMeshGateways: true, - }, - }, ) + if ct.peerThroughMGW { + clu.InitialConfigEntries = append(clu.InitialConfigEntries, + &api.MeshConfigEntry{ + Peering: &api.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + }, + ) + } } } // addMeshGateways adds a mesh gateway for every partition in the cluster. // Assumes that the LAN network name is equal to datacenter name. -func addMeshGateways(c *topology.Cluster) { +func (ct *commonTopo) addMeshGateways(c *topology.Cluster) { nodeKind := topology.NodeKindClient - if c.Datacenter == agentlessDC { + if c.Datacenter == ct.agentlessDC { nodeKind = topology.NodeKindDataplane } for _, p := range c.Partitions { - c.Nodes = topology.MergeSlices(c.Nodes, topoutil.NewTopologyMeshGatewaySet( + sid, nodes := newTopologyMeshGatewaySet( nodeKind, p.Name, fmt.Sprintf("%s-%s-mgw", c.Name, p.Name), 1, []string{c.Datacenter, "wan"}, nil, - )) + ) + c.Nodes = topology.MergeSlices(c.Nodes, nodes) + // for services exported in the same cluster between partitions, we need + // to export the mesh gateway (but not for peering) + // https://github.com/hashicorp/consul/pull/19052 + consumers := []api.ServiceConsumer{} + for _, cp := range c.Partitions { + if cp.Name == p.Name { + continue + } + consumers = append(consumers, api.ServiceConsumer{ + Partition: cp.Name, + }) + } + if len(consumers) > 0 { + ct.ExportService(c, p.Name, api.ExportedService{ + Name: sid.Name, + Namespace: sid.Namespace, + Consumers: consumers, + }) + } } } -func clusterWithJustServers(name string, numServers int) *topology.Cluster { +func (ct *commonTopo) clusterWithJustServers(name string, numServers int) *topology.Cluster { + nets := []string{name} + if !ct.peerThroughMGW { + nets = append(nets, "wan") + } return &topology.Cluster{ Enterprise: utils.IsEnterprise(), Name: name, @@ -385,7 +450,7 @@ func clusterWithJustServers(name string, numServers int) *topology.Cluster { Nodes: topoutil.NewTopologyServerSet( name+"-server", numServers, - []string{name}, + nets, nil, ), } @@ -446,3 +511,16 @@ func NewFortioServiceWithDefaults( ) *topology.Service { return topoutil.NewFortioServiceWithDefaults(cluster, sid, topology.NodeVersionV1, mut) } + +func newTopologyMeshGatewaySet( + nodeKind topology.NodeKind, + partition string, + namePrefix string, + num int, + networks []string, + mutateFn func(i int, node *topology.Node), +) (topology.ServiceID, []*topology.Node) { + nodes := topoutil.NewTopologyMeshGatewaySet(nodeKind, partition, namePrefix, num, networks, mutateFn) + sid := nodes[0].Services[0].ID + return sid, nodes +} diff --git a/test-integ/topoutil/asserter.go b/test-integ/topoutil/asserter.go index dcf9da52d0..dd0500922c 100644 --- a/test-integ/topoutil/asserter.go +++ b/test-integ/topoutil/asserter.go @@ -198,24 +198,6 @@ func (a *Asserter) HealthyWithPeer(t *testing.T, cluster string, sid topology.Se }) } -func (a *Asserter) UpstreamEndpointHealthy(t *testing.T, svc *topology.Service, upstream *topology.Upstream) { - t.Helper() - node := svc.Node - ip := node.LocalAddress() - port := svc.EnvoyAdminPort - addr := fmt.Sprintf("%s:%d", ip, port) - - client := a.mustGetHTTPClient(t, node.Cluster) - libassert.AssertUpstreamEndpointStatusWithClient(t, - client, - addr, - // TODO: what is default? namespace? partition? - fmt.Sprintf("%s.default.%s.external", upstream.ID.Name, upstream.Peer), - "HEALTHY", - 1, - ) -} - type testingT interface { require.TestingT Helper() diff --git a/test-integ/topoutil/fixtures.go b/test-integ/topoutil/fixtures.go index c8e9afbe10..a0c72e246a 100644 --- a/test-integ/topoutil/fixtures.go +++ b/test-integ/topoutil/fixtures.go @@ -96,6 +96,10 @@ func NewTopologyMeshGatewaySet( mutateFn func(i int, node *topology.Node), ) []*topology.Node { var out []*topology.Node + sid := topology.ServiceID{ + Name: "mesh-gateway", + Partition: ConfigEntryPartition(partition), + } for i := 1; i <= num; i++ { name := namePrefix + strconv.Itoa(i) @@ -104,7 +108,7 @@ func NewTopologyMeshGatewaySet( Partition: partition, Name: name, Services: []*topology.Service{{ - ID: topology.ServiceID{Name: "mesh-gateway"}, + ID: sid, Port: 8443, EnvoyAdminPort: 19000, IsMeshGateway: true, @@ -122,3 +126,12 @@ func NewTopologyMeshGatewaySet( } return out } + +// Since CE config entries do not contain the partition field, +// this func converts default partition to empty string. +func ConfigEntryPartition(p string) string { + if p == "default" { + return "" // make this CE friendly + } + return p +} diff --git a/test/integration/consul-container/libs/cluster/cluster.go b/test/integration/consul-container/libs/cluster/cluster.go index fbee48333e..630fda2bc4 100644 --- a/test/integration/consul-container/libs/cluster/cluster.go +++ b/test/integration/consul-container/libs/cluster/cluster.go @@ -16,13 +16,13 @@ import ( "time" goretry "github.com/avast/retry-go" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" "github.com/teris-io/shortid" "github.com/testcontainers/testcontainers-go" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) diff --git a/test/integration/consul-container/test/ratelimit/ratelimit_test.go b/test/integration/consul-container/test/ratelimit/ratelimit_test.go index e598e0ceb8..89293da7f8 100644 --- a/test/integration/consul-container/test/ratelimit/ratelimit_test.go +++ b/test/integration/consul-container/test/ratelimit/ratelimit_test.go @@ -4,17 +4,19 @@ package ratelimit import ( + "context" "fmt" "strings" "testing" "time" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/stretchr/testify/require" + libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) const ( @@ -45,10 +47,11 @@ func TestServerRequestRateLimit(t *testing.T) { expectMetric bool } type testCase struct { - description string - cmd string - operations []operation - mode string + description string + cmd string + operations []operation + mode string + enterpriseOnly bool } // getKV and putKV are net/RPC calls @@ -69,6 +72,30 @@ func TestServerRequestRateLimit(t *testing.T) { rateLimitType: "global/write", } + // listPartition and putPartition are gRPC calls + listPartition := action{ + function: func(client *api.Client) error { + ctx := context.Background() + _, _, err := client.Partitions().List(ctx, nil) + return err + }, + rateLimitOperation: "/partition.PartitionService/List", + rateLimitType: "global/read", + } + + putPartition := action{ + function: func(client *api.Client) error { + ctx := context.Background() + p := api.Partition{ + Name: "ptest", + } + _, _, err := client.Partitions().Create(ctx, &p, nil) + return err + }, + rateLimitOperation: "/partition.PartitionService/Write", + rateLimitType: "global/write", + } + testCases := []testCase{ // HTTP & net/RPC { @@ -128,9 +155,73 @@ func TestServerRequestRateLimit(t *testing.T) { }, }, }, + // gRPC + { + description: "GRPC / Mode: disabled - errors: no / exceeded logs: no / metrics: no", + cmd: `-hcl=limits { request_limits { mode = "disabled" read_rate = 0 write_rate = 0 }}`, + mode: "disabled", + operations: []operation{ + { + action: putPartition, + expectedErrorMsg: "", + expectExceededLog: false, + expectMetric: false, + }, + { + action: listPartition, + expectedErrorMsg: "", + expectExceededLog: false, + expectMetric: false, + }, + }, + enterpriseOnly: true, + }, + { + description: "GRPC / Mode: permissive - errors: no / exceeded logs: yes / metrics: no", + cmd: `-hcl=limits { request_limits { mode = "permissive" read_rate = 0 write_rate = 0 }}`, + mode: "permissive", + operations: []operation{ + { + action: putPartition, + expectedErrorMsg: "", + expectExceededLog: true, + expectMetric: true, + }, + { + action: listPartition, + expectedErrorMsg: "", + expectExceededLog: true, + expectMetric: true, + }, + }, + enterpriseOnly: true, + }, + { + description: "GRPC / Mode: enforcing - errors: yes / exceeded logs: yes / metrics: yes", + cmd: `-hcl=limits { request_limits { mode = "enforcing" read_rate = 0 write_rate = 0 }}`, + mode: "enforcing", + operations: []operation{ + { + action: putPartition, + expectedErrorMsg: nonRetryableErrorMsg, + expectExceededLog: true, + expectMetric: true, + }, + { + action: listPartition, + expectedErrorMsg: retryableErrorMsg, + expectExceededLog: true, + expectMetric: true, + }, + }, + enterpriseOnly: true, + }, } for _, tc := range testCases { + if tc.enterpriseOnly && !utils.IsEnterprise() { + continue + } tc := tc t.Run(tc.description, func(t *testing.T) { t.Parallel() diff --git a/testing/deployer/sprawl/internal/build/docker.go b/testing/deployer/sprawl/internal/build/docker.go index b8ca695f9b..435fb0b506 100644 --- a/testing/deployer/sprawl/internal/build/docker.go +++ b/testing/deployer/sprawl/internal/build/docker.go @@ -98,23 +98,26 @@ func DockerImages( run *runner.Runner, t *topology.Topology, ) error { - logw := logger.Named("docker").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) built := make(map[string]struct{}) for _, c := range t.Clusters { for _, n := range c.Nodes { joint := n.Images.EnvoyConsulImage() if _, ok := built[joint]; joint != "" && !ok { - logger.Info("building image", "image", joint) - err := run.DockerExec(context.TODO(), []string{ + logger.Info("building envoy+consul image", "image", joint) + logw := logger.Named("docker_envoy_consul").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) + + err := run.DockerExecWithStderr(context.TODO(), []string{ "build", + // provenance causes non-idempotent builds, which leads to spurious terraform replacements + "--provenance=false", "--build-arg", "CONSUL_IMAGE=" + n.Images.Consul, "--build-arg", "ENVOY_IMAGE=" + n.Images.Envoy, "-t", joint, "-", - }, logw, strings.NewReader(dockerfileEnvoy)) + }, logw, logw, strings.NewReader(dockerfileEnvoy)) if err != nil { return err } @@ -124,14 +127,16 @@ func DockerImages( cdp := n.Images.LocalDataplaneImage() if _, ok := built[cdp]; cdp != "" && !ok { - logger.Info("building image", "image", cdp) - err := run.DockerExec(context.TODO(), []string{ + logger.Info("building dataplane image", "image", cdp) + logw := logger.Named("docker_dataplane").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) + err := run.DockerExecWithStderr(context.TODO(), []string{ "build", + "--provenance=false", "--build-arg", "DATAPLANE_IMAGE=" + n.Images.Dataplane, "-t", cdp, "-", - }, logw, strings.NewReader(dockerfileDataplane)) + }, logw, logw, strings.NewReader(dockerfileDataplane)) if err != nil { return err } @@ -142,7 +147,8 @@ func DockerImages( cdpTproxy := n.Images.LocalDataplaneTProxyImage() if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok { logger.Info("building image", "image", cdpTproxy) - err := run.DockerExec(context.TODO(), []string{ + logw := logger.Named("docker_dataplane_tproxy").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) + err := run.DockerExecWithStderr(context.TODO(), []string{ "build", "--build-arg", "DATAPLANE_IMAGE=" + n.Images.Dataplane, @@ -150,7 +156,7 @@ func DockerImages( "CONSUL_IMAGE=" + n.Images.Consul, "-t", cdpTproxy, "-", - }, logw, strings.NewReader(dockerfileDataplaneForTProxy)) + }, logw, logw, strings.NewReader(dockerfileDataplaneForTProxy)) if err != nil { return err } diff --git a/testing/deployer/sprawl/peering.go b/testing/deployer/sprawl/peering.go index dd280cc49a..12c6f3e3ce 100644 --- a/testing/deployer/sprawl/peering.go +++ b/testing/deployer/sprawl/peering.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "log" "net/http" "strings" "time" @@ -62,6 +63,8 @@ func (s *Sprawl) initPeerings() error { req1.Partition = peering.Accepting.Partition } + s.awaitMeshGateways() + GENTOKEN: resp, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), req1, nil) if err != nil { @@ -108,6 +111,7 @@ func (s *Sprawl) initPeerings() error { } func (s *Sprawl) waitForPeeringEstablishment() error { + s.awaitMeshGateways() var ( logger = s.logger.Named("peering") ) @@ -181,3 +185,64 @@ func (s *Sprawl) checkPeeringDirection(logger hclog.Logger, client *api.Client, } logger.Debug("peering is active", "dur", time.Since(startTime).Round(time.Second)) } + +func (s *Sprawl) awaitMeshGateways() { + startTime := time.Now() + s.logger.Info("awaiting mesh gateways") + // TODO: maybe a better way to do this + mgws := []*topology.Service{} + for _, clu := range s.topology.Clusters { + for _, node := range clu.Nodes { + for _, svc := range node.Services { + if svc.IsMeshGateway { + mgws = append(mgws, svc) + } + } + } + } + + // TODO: parallel + for _, mgw := range mgws { + cl := s.clients[mgw.Node.Cluster] + logger := s.logger.With("cluster", mgw.Node.Cluster, "sid", mgw.ID, "nid", mgw.Node.ID()) + logger.Info("awaiting MGW readiness") + RETRY: + // TODO: not sure if there's a better way to check if the MGW is ready + svcs, _, err := cl.Catalog().Service(mgw.ID.Name, "", &api.QueryOptions{ + Namespace: mgw.ID.Namespace, + Partition: mgw.ID.Partition, + }) + if err != nil { + logger.Debug("fetching MGW service", "err", err) + time.Sleep(time.Second) + goto RETRY + } + if len(svcs) < 1 { + logger.Debug("no MGW service in catalog yet") + time.Sleep(time.Second) + goto RETRY + } + if len(svcs) > 1 { + // not sure when this would happen + log.Fatalf("expected 1 MGW service, actually: %#v", svcs) + } + + entries, _, err := cl.Health().Service(mgw.ID.Name, "", true, &api.QueryOptions{ + Namespace: mgw.ID.Namespace, + Partition: mgw.ID.Partition, + }) + if err != nil { + logger.Debug("fetching MGW checks", "err", err) + time.Sleep(time.Second) + goto RETRY + } + if len(entries) != 1 { + logger.Debug("expected 1 MGW entry", "entries", entries) + time.Sleep(time.Second) + goto RETRY + } + + logger.Debug("MGW ready", "entry", *(entries[0]), "dur", time.Since(startTime).Round(time.Second)) + } + s.logger.Info("mesh gateways ready", "dur", time.Since(startTime).Round(time.Second)) +} diff --git a/testing/deployer/sprawl/sprawl.go b/testing/deployer/sprawl/sprawl.go index c9dd1eb4e6..2cf2c9371a 100644 --- a/testing/deployer/sprawl/sprawl.go +++ b/testing/deployer/sprawl/sprawl.go @@ -33,10 +33,12 @@ import ( // Sprawl is the definition of a complete running Consul deployment topology. type Sprawl struct { - logger hclog.Logger - runner *runner.Runner - license string - secrets secrets.Store + logger hclog.Logger + // set after initial Launch is complete + launchLogger hclog.Logger + runner *runner.Runner + license string + secrets secrets.Store workdir string @@ -212,11 +214,20 @@ func Launch( return nil, fmt.Errorf("error gathering diagnostic details: %w", err) } + s.launchLogger = s.logger + return s, nil } func (s *Sprawl) Relaunch( cfg *topology.Config, +) error { + return s.RelaunchWithPhase(cfg, "") +} + +func (s *Sprawl) RelaunchWithPhase( + cfg *topology.Config, + phase string, ) error { // Copy this BEFORE compiling so we capture the original definition, without denorms. var err error @@ -225,6 +236,10 @@ func (s *Sprawl) Relaunch( return err } + if phase != "" { + s.logger = s.launchLogger.Named(phase) + } + newTopology, err := topology.Recompile(s.logger.Named("recompile"), cfg, s.topology) if err != nil { return fmt.Errorf("topology.Compile: %w", err)