From bba3eb8cdd425de7087cc8c0cdfe1214fcc54f1d Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 9 Jun 2022 11:05:18 -0500 Subject: [PATCH] peering: mesh gateways are required for cross-peer service mesh communication (#13410) Require use of mesh gateways in order for service mesh data plane traffic to flow between peers. This also adds plumbing for envoy integration tests involving peers, and one starter peering test. --- .../grpc/private/services/subscribe/logger.go | 2 + agent/rpc/peering/replication.go | 7 +- agent/rpc/peering/service.go | 6 +- agent/rpc/peering/stream_test.go | 61 +++- agent/rpc/peering/subscription_manager.go | 169 +-------- .../rpc/peering/subscription_manager_test.go | 342 +++++------------- agent/rpc/peering/subscription_state.go | 25 +- agent/rpc/peering/subscription_view.go | 14 - .../connect/envoy/Dockerfile-tcpdump | 2 +- .../envoy/case-cross-peers/alpha/base.hcl | 2 + .../case-cross-peers/alpha/config_entries.hcl | 26 ++ .../alpha/service_gateway.hcl | 5 + .../case-cross-peers/alpha/service_s1.hcl | 1 + .../case-cross-peers/alpha/service_s2.hcl | 7 + .../envoy/case-cross-peers/alpha/setup.sh | 11 + .../envoy/case-cross-peers/alpha/verify.bats | 31 ++ .../connect/envoy/case-cross-peers/bind.hcl | 2 + .../connect/envoy/case-cross-peers/capture.sh | 6 + .../primary/config_entries.hcl | 12 + .../primary/service_gateway.hcl | 5 + .../case-cross-peers/primary/service_s1.hcl | 17 + .../case-cross-peers/primary/service_s2.hcl | 1 + .../envoy/case-cross-peers/primary/setup.sh | 10 + .../case-cross-peers/primary/verify.bats | 57 +++ .../connect/envoy/case-cross-peers/vars.sh | 4 + test/integration/connect/envoy/defaults.sh | 3 +- test/integration/connect/envoy/helpers.bash | 35 +- test/integration/connect/envoy/run-tests.sh | 68 +++- 28 files changed, 443 insertions(+), 488 deletions(-) create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/base.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/config_entries.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/service_gateway.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/service_s1.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/service_s2.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/setup.sh create mode 100644 test/integration/connect/envoy/case-cross-peers/alpha/verify.bats create mode 100644 test/integration/connect/envoy/case-cross-peers/bind.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/capture.sh create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/config_entries.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/service_gateway.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/service_s1.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/service_s2.hcl create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/setup.sh create mode 100644 test/integration/connect/envoy/case-cross-peers/primary/verify.bats create mode 100644 test/integration/connect/envoy/case-cross-peers/vars.sh diff --git a/agent/grpc/private/services/subscribe/logger.go b/agent/grpc/private/services/subscribe/logger.go index 693c8604af..187f12d3b9 100644 --- a/agent/grpc/private/services/subscribe/logger.go +++ b/agent/grpc/private/services/subscribe/logger.go @@ -41,8 +41,10 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger { return l.With( "topic", req.Topic.String(), "dc", req.Datacenter, + "peer", req.PeerName, "key", req.Key, "namespace", req.Namespace, + "partition", req.Partition, "request_index", req.Index, "stream_id", &streamID{}) } diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go index 1f546bdd32..c8b9726647 100644 --- a/agent/rpc/peering/replication.go +++ b/agent/rpc/peering/replication.go @@ -44,12 +44,7 @@ func makeServiceResponse( return nil } - var serviceName string - if strings.HasPrefix(update.CorrelationID, subExportedService) { - serviceName = strings.TrimPrefix(update.CorrelationID, subExportedService) - } else { - serviceName = strings.TrimPrefix(update.CorrelationID, subExportedProxyService) + syntheticProxyNameSuffix - } + serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) // If no nodes are present then it's due to one of: // 1. The service is newly registered or exported and yielded a transient empty update. diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 1b6d565b11..43f16fb001 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -48,8 +48,6 @@ func (e *errPeeringInvalidServerAddress) Error() string { type Config struct { Datacenter string ConnectEnabled bool - // TODO(peering): remove this when we're ready - DisableMeshGatewayMode bool } // Service implements pbpeering.PeeringService to provide RPC operations for @@ -62,7 +60,6 @@ type Service struct { } func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service { - cfg.DisableMeshGatewayMode = true return &Service{ Backend: backend, logger: logger, @@ -735,8 +732,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { case update := <-subCh: var resp *pbpeering.ReplicationMessage switch { - case strings.HasPrefix(update.CorrelationID, subExportedService), - strings.HasPrefix(update.CorrelationID, subExportedProxyService): + case strings.HasPrefix(update.CorrelationID, subExportedService): resp = makeServiceResponse(logger, update) case strings.HasPrefix(update.CorrelationID, subMeshGateway): diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 7c29a63426..3b7e801c57 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -619,12 +619,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { } func TestStreamResources_Server_ServiceUpdates(t *testing.T) { - testStreamResources_Server_ServiceUpdates(t, true) -} -func TestStreamResources_Server_ServiceUpdates_EnableMeshGateways(t *testing.T) { - testStreamResources_Server_ServiceUpdates(t, false) -} -func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways bool) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) @@ -638,9 +632,8 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways srv := NewService( testutil.Logger(t), Config{ - Datacenter: "dc1", - ConnectEnabled: true, - DisableMeshGatewayMode: disableMeshGateways, + Datacenter: "dc1", + ConnectEnabled: true, }, &testStreamBackend{ store: store, pub: publisher, @@ -659,15 +652,6 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways lastIdx++ require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "foo", &structs.NodeService{ - ID: "mysql-sidecar-proxy", - Service: "mysql-sidecar-proxy", - Kind: structs.ServiceKindConnectProxy, - Port: 5000, - Proxy: structs.ConnectProxyConfig{DestinationServiceName: "mysql"}, - })) - var ( mongoSN = structs.NewServiceName("mongo", nil).String() mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() @@ -703,12 +687,14 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways // Roots tested in TestStreamResources_Server_CARootUpdates }, func(t *testing.T, msg *pbpeering.ReplicationMessage) { + // no mongo instances exist require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, mongoSN, msg.GetResponse().ResourceID) require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) require.Nil(t, msg.GetResponse().Resource) }, func(t *testing.T, msg *pbpeering.ReplicationMessage) { + // proxies can't export because no mesh gateway exists yet require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) @@ -723,6 +709,41 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.Len(t, nodes.Nodes, 1) }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + // proxies can't export because no mesh gateway exists yet + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + ) + }) + + testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) { + gateway := &structs.CheckServiceNode{Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, + Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, + // TODO: checks + } + + lastIdx++ + require.NoError(t, store.EnsureNode(lastIdx, gateway.Node)) + + lastIdx++ + require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service)) + + expectReplEvents(t, client, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) + + var nodes pbservice.IndexedCheckServiceNodes + require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + require.Len(t, nodes.Nodes, 1) + + svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo" + require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID) + }, func(t *testing.T, msg *pbpeering.ReplicationMessage) { require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) @@ -1196,9 +1217,11 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } } + const timeout = 10 * time.Second + var out []*pbpeering.ReplicationMessage for len(out) < num { - msg, err := client.RecvWithTimeout(100 * time.Millisecond) + msg, err := client.RecvWithTimeout(timeout) if err == io.EOF && msg == nil { t.Fatalf("timed out with %d of %d events", len(out), num) } diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index 42409fd14b..e6d0d0e2ab 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -75,12 +75,9 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) - if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled { - go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) - } - - // If connect is enabled, watch for updates to CA roots. if m.config.ConnectEnabled { + go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) + // If connect is enabled, watch for updates to CA roots. go m.notifyRootCAUpdates(ctx, state.updateCh) } @@ -129,12 +126,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti pending := &pendingPayload{} m.syncNormalServices(ctx, state, pending, evt.Services) - if m.config.DisableMeshGatewayMode { - m.syncProxyServices(ctx, state, pending, evt.Services) - } else { - if m.config.ConnectEnabled { - m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) - } + if m.config.ConnectEnabled { + m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) } state.sendPendingEvents(ctx, m.logger, pending) @@ -152,32 +145,25 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // Clear this raft index before exporting. csn.Index = 0 - if !m.config.DisableMeshGatewayMode { - // Ensure that connect things are scrubbed so we don't mix-and-match - // with the synthetic entries that point to mesh gateways. - filterConnectReferences(csn) + // Ensure that connect things are scrubbed so we don't mix-and-match + // with the synthetic entries that point to mesh gateways. + filterConnectReferences(csn) - // Flatten health checks - for _, instance := range csn.Nodes { - instance.Checks = flattenChecks( - instance.Node.Node, - instance.Service.ID, - instance.Service.Service, - instance.Service.EnterpriseMeta, - instance.Checks, - ) - } + // Flatten health checks + for _, instance := range csn.Nodes { + instance.Checks = flattenChecks( + instance.Node.Node, + instance.Service.ID, + instance.Service.Service, + instance.Service.EnterpriseMeta, + instance.Checks, + ) } // Scrub raft indexes for _, instance := range csn.Nodes { instance.Node.RaftIndex = nil instance.Service.RaftIndex = nil - if m.config.DisableMeshGatewayMode { - for _, chk := range instance.Checks { - chk.RaftIndex = nil - } - } // skip checks since we just generated one from scratch } @@ -197,61 +183,6 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti } state.sendPendingEvents(ctx, m.logger, pending) - case strings.HasPrefix(u.CorrelationID, subExportedProxyService): - csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) - if !ok { - return fmt.Errorf("invalid type for response: %T", u.Result) - } - - if !m.config.DisableMeshGatewayMode { - return nil // ignore event - } - - sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, subExportedProxyService)) - spiffeID := connect.SpiffeIDService{ - Host: m.trustDomain, - Partition: sn.PartitionOrDefault(), - Namespace: sn.NamespaceOrDefault(), - Datacenter: m.config.Datacenter, - Service: sn.Name, - } - sni := connect.PeeredServiceSNI( - sn.Name, - sn.NamespaceOrDefault(), - sn.PartitionOrDefault(), - state.peerName, - m.trustDomain, - ) - peerMeta := &pbservice.PeeringServiceMeta{ - SNI: []string{sni}, - SpiffeID: []string{spiffeID.URI().String()}, - Protocol: "tcp", - } - - // skip checks since we just generated one from scratch - // Set peerMeta on all instances and scrub the raft indexes. - for _, instance := range csn.Nodes { - instance.Service.Connect.PeerMeta = peerMeta - - instance.Node.RaftIndex = nil - instance.Service.RaftIndex = nil - if m.config.DisableMeshGatewayMode { - for _, chk := range instance.Checks { - chk.RaftIndex = nil - } - } - } - csn.Index = 0 - - id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService) - - // Just ferry this one directly along to the destination. - pending := &pendingPayload{} - if err := pending.Add(id, u.CorrelationID, csn); err != nil { - return err - } - state.sendPendingEvents(ctx, m.logger, pending) - case strings.HasPrefix(u.CorrelationID, subMeshGateway): csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) if !ok { @@ -260,7 +191,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway) - if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled { + if !m.config.ConnectEnabled { return nil // ignore event } @@ -510,57 +441,6 @@ func (m *subscriptionManager) syncNormalServices( } } -// TODO(peering): remove -func (m *subscriptionManager) syncProxyServices( - ctx context.Context, - state *subscriptionState, - pending *pendingPayload, - services []structs.ServiceName, -) { - // seen contains the set of exported service names and is used to reconcile the list of watched services. - seen := make(map[structs.ServiceName]struct{}) - - // Ensure there is a subscription for each service exported to the peer. - for _, svc := range services { - seen[svc] = struct{}{} - - if _, ok := state.watchedProxyServices[svc]; ok { - // Exported service is already being watched, nothing to do. - continue - } - - notifyCtx, cancel := context.WithCancel(ctx) - if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil { - cancel() - m.logger.Error("failed to subscribe to proxy service", "service", svc.String()) - continue - } - - state.watchedProxyServices[svc] = cancel - } - - // For every subscription without an exported service, call the associated cancel fn. - for svc, cancel := range state.watchedProxyServices { - if _, ok := seen[svc]; !ok { - cancel() - - delete(state.watchedProxyServices, svc) - - // Send an empty event to the stream handler to trigger sending a DELETE message. - // Cancelling the subscription context above is necessary, but does not yield a useful signal on its own. - err := pending.Add( - proxyServicePayloadIDPrefix+svc.String(), - subExportedProxyService+svc.String(), - &pbservice.IndexedCheckServiceNodes{}, - ) - if err != nil { - m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err) - continue - } - } - } -} - func (m *subscriptionManager) syncDiscoveryChains( ctx context.Context, state *subscriptionState, @@ -761,10 +641,9 @@ func flattenChecks( } const ( - subExportedServiceList = "exported-service-list" - subExportedService = "exported-service:" - subExportedProxyService = "exported-proxy-service:" - subMeshGateway = "mesh-gateway:" + subExportedServiceList = "exported-service-list" + subExportedService = "exported-service:" + subMeshGateway = "mesh-gateway:" ) // NotifyStandardService will notify the given channel when there are updates @@ -777,14 +656,6 @@ func (m *subscriptionManager) NotifyStandardService( sr := newExportedStandardServiceRequest(m.logger, svc, m.backend) return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh) } -func (m *subscriptionManager) NotifyConnectProxyService( - ctx context.Context, - svc structs.ServiceName, - updateCh chan<- cache.UpdateEvent, -) error { - sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend) - return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh) -} // syntheticProxyNameSuffix is the suffix to add to synthetic proxies we // replicate to route traffic to an exported discovery chain through the mesh diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index 299fd9ad81..a7c49090b5 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/durationpb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" @@ -23,12 +22,6 @@ import ( ) func TestSubscriptionManager_RegisterDeregister(t *testing.T) { - testSubscriptionManager_RegisterDeregister(t, true) -} -func TestSubscriptionManager_RegisterDeregister_EnableMeshGateways(t *testing.T) { - testSubscriptionManager_RegisterDeregister(t, false) -} -func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateways bool) { backend := newTestSubscriptionBackend(t) // initialCatalogIdx := backend.lastIdx @@ -40,9 +33,8 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ - Datacenter: "dc1", - ConnectEnabled: true, - DisableMeshGatewayMode: disableMeshGateways, + Datacenter: "dc1", + ConnectEnabled: true, }, connect.TestTrustDomain, backend) subCh := mgr.subscribe(ctx, id, "my-peering", partition) @@ -52,18 +44,12 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() - - mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String() ) - if disableMeshGateways { - expectEvents(t, subCh) - } else { - // Expect just the empty mesh gateway event to replicate. - expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, gatewayCorrID, 0) - }) - } + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) + }) testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) { backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{ @@ -84,25 +70,14 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway }, }) - if disableMeshGateways { - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID_temp, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 0) - }, - ) - } else { - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID, 0) - }, - ) - } + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) }) mysql1 := &structs.CheckServiceNode{ @@ -125,17 +100,10 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway require.Len(t, res.Nodes, 1) - if disableMeshGateways { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), - }, res.Nodes[0]) - } else { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService("", "mysql-1", "mysql", 5000, nil), - }, res.Nodes[0]) - } + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + }, res.Nodes[0]) }) backend.ensureCheck(t, mysql1.Checks[0]) @@ -148,23 +116,13 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway require.Len(t, res.Nodes, 1) - if disableMeshGateways { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), - }, - }, res.Nodes[0]) - } else { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck("foo", "mysql-1", "mysql", "critical", nil), - }, - }, res.Nodes[0]) - } + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[0]) }) }) @@ -188,31 +146,17 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway require.Len(t, res.Nodes, 2) - if disableMeshGateways { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), - }, res.Nodes[0]) - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), - }, - }, res.Nodes[1]) - } else { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService("", "mysql-2", "mysql", 5000, nil), - }, res.Nodes[0]) - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck("foo", "mysql-1", "mysql", "critical", nil), - }, - }, res.Nodes[1]) - } + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[1]) }) backend.ensureCheck(t, mysql2.Checks[0]) @@ -224,37 +168,20 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 2) - if disableMeshGateways { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil), - }, - }, res.Nodes[0]) - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), - }, - }, res.Nodes[1]) - } else { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService("", "mysql-2", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck("bar", "mysql-2", "mysql", "critical", nil), - }, - }, res.Nodes[0]) - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("foo", "10.0.0.1", partition), - Service: pbService("", "mysql-1", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck("foo", "mysql-1", "mysql", "critical", nil), - }, - }, res.Nodes[1]) - } + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("bar", "mysql-2", "mysql", "critical", nil), + }, + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[1]) }) }) @@ -284,31 +211,17 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 1) - if disableMeshGateways { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil), - }, - }, res.Nodes[0]) - } else { - prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ - Node: pbNode("bar", "10.0.0.2", partition), - Service: pbService("", "mysql-2", "mysql", 5000, nil), - Checks: []*pbservice.HealthCheck{ - pbCheck("bar", "mysql-2", "mysql", "critical", nil), - }, - }, res.Nodes[0]) - } + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("bar", "mysql-2", "mysql", "critical", nil), + }, + }, res.Nodes[0]) }) }) testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) { - if disableMeshGateways { - t.Skip() - return - } gateway := &structs.CheckServiceNode{ Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, @@ -381,10 +294,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway }) testutil.RunStep(t, "deregister mesh gateway to send proxy removals", func(t *testing.T) { - if disableMeshGateways { - t.Skip() - return - } backend.deleteService(t, "mgw", "gateway-1") expectEvents(t, subCh, @@ -407,12 +316,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway } func TestSubscriptionManager_InitialSnapshot(t *testing.T) { - testSubscriptionManager_InitialSnapshot(t, true) -} -func TestSubscriptionManager_InitialSnapshot_EnableMeshGateways(t *testing.T) { - testSubscriptionManager_InitialSnapshot(t, false) -} -func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways bool) { backend := newTestSubscriptionBackend(t) // initialCatalogIdx := backend.lastIdx @@ -424,9 +327,8 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ - Datacenter: "dc1", - ConnectEnabled: true, - DisableMeshGatewayMode: disableMeshGateways, + Datacenter: "dc1", + ConnectEnabled: true, }, connect.TestTrustDomain, backend) subCh := mgr.subscribe(ctx, id, "my-peering", partition) @@ -455,20 +357,12 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String() chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String() - - mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String() - mongoProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mongo", nil).String() - chainProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("chain", nil).String() ) - if disableMeshGateways { - expectEvents(t, subCh) - } else { - // Expect just the empty mesh gateway event to replicate. - expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, gatewayCorrID, 0) - }) - } + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) + }) // At this point in time we'll have a mesh-gateway notification with no // content stored and handled. @@ -497,56 +391,29 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b }, }) - if disableMeshGateways { - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainProxyCorrID_temp, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoProxyCorrID_temp, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID_temp, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) - }, - ) - } else { - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainProxyCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoProxyCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID, 0) - }, - ) - } + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) }) testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) { - if disableMeshGateways { - t.Skip() - return - } gateway := &structs.CheckServiceNode{ Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, @@ -850,29 +717,6 @@ func pbService(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMe } } -func pbService_temp(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService { - if entMeta == nil { - entMeta = pbcommon.DefaultEnterpriseMeta - } - return &pbservice.NodeService{ - ID: id, - Kind: kind, - Service: name, - Port: port, - Weights: &pbservice.Weights{ - Passing: 1, - Warning: 1, - }, - EnterpriseMeta: entMeta, - Connect: &pbservice.ServiceConnect{}, - Proxy: &pbservice.ConnectProxyConfig{ - MeshGateway: &pbservice.MeshGatewayConfig{}, - Expose: &pbservice.ExposeConfig{}, - TransparentProxy: &pbservice.TransparentProxyConfig{}, - }, - } -} - func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck { if entMeta == nil { entMeta = pbcommon.DefaultEnterpriseMeta @@ -887,23 +731,3 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe EnterpriseMeta: entMeta, } } - -func pbCheck_temp(node, svcID, svcName, checkID, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck { - if entMeta == nil { - entMeta = pbcommon.DefaultEnterpriseMeta - } - return &pbservice.HealthCheck{ - Node: node, - CheckID: checkID, - Status: status, - ServiceID: svcID, - ServiceName: svcName, - EnterpriseMeta: entMeta, - Definition: &pbservice.HealthCheckDefinition{ - DeregisterCriticalServiceAfter: durationpb.New(0), - Interval: durationpb.New(0), - TTL: durationpb.New(0), - Timeout: durationpb.New(0), - }, - } -} diff --git a/agent/rpc/peering/subscription_state.go b/agent/rpc/peering/subscription_state.go index 29bbff967c..bd9da52e8e 100644 --- a/agent/rpc/peering/subscription_state.go +++ b/agent/rpc/peering/subscription_state.go @@ -25,9 +25,8 @@ type subscriptionState struct { // plain data exportList *structs.ExportedServiceList - watchedServices map[structs.ServiceName]context.CancelFunc - watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove - connectServices map[structs.ServiceName]string // value:protocol + watchedServices map[structs.ServiceName]context.CancelFunc + connectServices map[structs.ServiceName]string // value:protocol // eventVersions is a duplicate event suppression system keyed by the "id" // not the "correlationID" @@ -46,12 +45,11 @@ type subscriptionState struct { func newSubscriptionState(peerName, partition string) *subscriptionState { return &subscriptionState{ - peerName: peerName, - partition: partition, - watchedServices: make(map[structs.ServiceName]context.CancelFunc), - watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc), - connectServices: make(map[structs.ServiceName]string), - eventVersions: make(map[string]string), + peerName: peerName, + partition: partition, + watchedServices: make(map[structs.ServiceName]context.CancelFunc), + connectServices: make(map[structs.ServiceName]string), + eventVersions: make(map[string]string), } } @@ -103,14 +101,6 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) { keep = true } - case strings.HasPrefix(id, proxyServicePayloadIDPrefix): - name := strings.TrimPrefix(id, proxyServicePayloadIDPrefix) - sn := structs.ServiceNameFromString(name) - - if _, ok := s.watchedProxyServices[sn]; ok { - keep = true - } - case strings.HasPrefix(id, discoveryChainPayloadIDPrefix): name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix) sn := structs.ServiceNameFromString(name) @@ -142,7 +132,6 @@ const ( caRootsPayloadID = "roots" meshGatewayPayloadID = "mesh-gateway" servicePayloadIDPrefix = "service:" - proxyServicePayloadIDPrefix = "proxy-service:" // TODO(peering): remove discoveryChainPayloadIDPrefix = "chain:" ) diff --git a/agent/rpc/peering/subscription_view.go b/agent/rpc/peering/subscription_view.go index 2cd2a6ab2d..e002565f7b 100644 --- a/agent/rpc/peering/subscription_view.go +++ b/agent/rpc/peering/subscription_view.go @@ -37,20 +37,6 @@ func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceN } } -// TODO(peering): remove -func newExportedConnectProxyServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest { - req := structs.ServiceSpecificRequest{ - ServiceName: svc.Name, - Connect: true, - EnterpriseMeta: svc.EnterpriseMeta, - } - return &exportedServiceRequest{ - logger: logger, - req: req, - sub: sub, - } -} - // CacheInfo implements submatview.Request func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo { return e.req.CacheInfo() diff --git a/test/integration/connect/envoy/Dockerfile-tcpdump b/test/integration/connect/envoy/Dockerfile-tcpdump index 8b56589523..03116e8f5a 100644 --- a/test/integration/connect/envoy/Dockerfile-tcpdump +++ b/test/integration/connect/envoy/Dockerfile-tcpdump @@ -1,4 +1,4 @@ -FROM alpine:latest +FROM alpine:3.12 RUN apk add --no-cache tcpdump VOLUME [ "/data" ] diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/base.hcl b/test/integration/connect/envoy/case-cross-peers/alpha/base.hcl new file mode 100644 index 0000000000..68265638f9 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/base.hcl @@ -0,0 +1,2 @@ +primary_datacenter = "alpha" +log_level = "trace" diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/config_entries.hcl b/test/integration/connect/envoy/case-cross-peers/alpha/config_entries.hcl new file mode 100644 index 0000000000..64d0117020 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/config_entries.hcl @@ -0,0 +1,26 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + }, + { + kind = "exported-services" + name = "default" + services = [ + { + name = "s2" + consumers = [ + { + peer_name = "alpha-to-primary" + } + ] + } + ] + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peers/alpha/service_gateway.hcl new file mode 100644 index 0000000000..bcdcb2e8b3 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4432 +} diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/service_s1.hcl b/test/integration/connect/envoy/case-cross-peers/alpha/service_s1.hcl new file mode 100644 index 0000000000..e97ec23666 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/service_s1.hcl @@ -0,0 +1 @@ +# We don't want an s1 service in this peer diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/service_s2.hcl b/test/integration/connect/envoy/case-cross-peers/alpha/service_s2.hcl new file mode 100644 index 0000000000..01d4505c67 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/service_s2.hcl @@ -0,0 +1,7 @@ +services { + name = "s2" + port = 8181 + connect { + sidecar_service {} + } +} diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/setup.sh b/test/integration/connect/envoy/case-cross-peers/alpha/setup.sh new file mode 100644 index 0000000000..820506ea9b --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/setup.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -euo pipefail + +register_services alpha + +gen_envoy_bootstrap s2 19002 alpha +gen_envoy_bootstrap mesh-gateway 19003 alpha true + +wait_for_config_entry proxy-defaults global alpha +wait_for_config_entry exported-services default alpha diff --git a/test/integration/connect/envoy/case-cross-peers/alpha/verify.bats b/test/integration/connect/envoy/case-cross-peers/alpha/verify.bats new file mode 100644 index 0000000000..6ace2b465d --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/alpha/verify.bats @@ -0,0 +1,31 @@ +#!/usr/bin/env bats + +load helpers + +@test "s2 proxy is running correct version" { + assert_envoy_version 19002 +} + +@test "s2 proxy admin is up on :19002" { + retry_default curl -f -s localhost:19002/stats -o /dev/null +} + +@test "gateway-alpha proxy admin is up on :19003" { + retry_default curl -f -s localhost:19003/stats -o /dev/null +} + +@test "s2 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s2 alpha +} + +@test "s2 proxy should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha:4432 +} + +@test "s2 proxies should be healthy" { + assert_service_has_healthy_instances s2 1 alpha +} diff --git a/test/integration/connect/envoy/case-cross-peers/bind.hcl b/test/integration/connect/envoy/case-cross-peers/bind.hcl new file mode 100644 index 0000000000..f54393f03e --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/bind.hcl @@ -0,0 +1,2 @@ +bind_addr = "0.0.0.0" +advertise_addr = "{{ GetInterfaceIP \"eth0\" }}" \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peers/capture.sh b/test/integration/connect/envoy/case-cross-peers/capture.sh new file mode 100644 index 0000000000..2419a58cdd --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/capture.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +snapshot_envoy_admin localhost:19000 s1 primary || true +snapshot_envoy_admin localhost:19001 mesh-gateway primary || true +snapshot_envoy_admin localhost:19002 s2 alpha || true +snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true diff --git a/test/integration/connect/envoy/case-cross-peers/primary/config_entries.hcl b/test/integration/connect/envoy/case-cross-peers/primary/config_entries.hcl new file mode 100644 index 0000000000..b3a8d91715 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/config_entries.hcl @@ -0,0 +1,12 @@ +config_entries { + bootstrap = [ + { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "tcp" + } + } + ] +} diff --git a/test/integration/connect/envoy/case-cross-peers/primary/service_gateway.hcl b/test/integration/connect/envoy/case-cross-peers/primary/service_gateway.hcl new file mode 100644 index 0000000000..831a70ff32 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/service_gateway.hcl @@ -0,0 +1,5 @@ +services { + name = "mesh-gateway" + kind = "mesh-gateway" + port = 4431 +} diff --git a/test/integration/connect/envoy/case-cross-peers/primary/service_s1.hcl b/test/integration/connect/envoy/case-cross-peers/primary/service_s1.hcl new file mode 100644 index 0000000000..0e3dcbc3ed --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/service_s1.hcl @@ -0,0 +1,17 @@ +services { + name = "s1" + port = 8080 + connect { + sidecar_service { + proxy { + upstreams = [ + { + destination_name = "s2" + destination_peer = "primary-to-alpha" + local_bind_port = 5000 + } + ] + } + } + } +} diff --git a/test/integration/connect/envoy/case-cross-peers/primary/service_s2.hcl b/test/integration/connect/envoy/case-cross-peers/primary/service_s2.hcl new file mode 100644 index 0000000000..77164e722b --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/service_s2.hcl @@ -0,0 +1 @@ +# We don't want an s2 service in the primary dc \ No newline at end of file diff --git a/test/integration/connect/envoy/case-cross-peers/primary/setup.sh b/test/integration/connect/envoy/case-cross-peers/primary/setup.sh new file mode 100644 index 0000000000..38122406d2 --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/setup.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -euo pipefail + +register_services primary + +gen_envoy_bootstrap s1 19000 primary +gen_envoy_bootstrap mesh-gateway 19001 primary true + +wait_for_config_entry proxy-defaults global diff --git a/test/integration/connect/envoy/case-cross-peers/primary/verify.bats b/test/integration/connect/envoy/case-cross-peers/primary/verify.bats new file mode 100644 index 0000000000..1e6d34c4ac --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/primary/verify.bats @@ -0,0 +1,57 @@ +#!/usr/bin/env bats + +load helpers + +@test "s1 proxy is running correct version" { + assert_envoy_version 19000 +} + +@test "s1 proxy admin is up on :19000" { + retry_default curl -f -s localhost:19000/stats -o /dev/null +} + +@test "gateway-primary proxy admin is up on :19001" { + retry_default curl -f -s localhost:19001/stats -o /dev/null +} + +@test "s1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s1 +} + +@test "s2 proxies should be healthy in alpha" { + assert_service_has_healthy_instances s2 1 alpha +} + +@test "gateway-primary should be up and listening" { + retry_long nc -z consul-primary:4431 +} + +@test "gateway-alpha should be up and listening" { + retry_long nc -z consul-alpha:4432 +} + +@test "peer the two clusters together" { + create_peering primary alpha +} + +@test "s2 alpha proxies should be healthy in primary" { + assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha +} + +@test "gateway-alpha should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status consul-alpha:19003 s2.default.alpha HEALTHY 1 +} + +@test "s1 upstream should have healthy endpoints for s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2.default.default.alpha-to-primary.external HEALTHY 1 +} + +@test "s1 upstream should be able to connect to s2" { + run retry_default curl -s -f -d hello localhost:5000 + [ "$status" -eq 0 ] + [ "$output" = "hello" ] +} + +@test "s1 upstream made 1 connection to s2" { + assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1 +} diff --git a/test/integration/connect/envoy/case-cross-peers/vars.sh b/test/integration/connect/envoy/case-cross-peers/vars.sh new file mode 100644 index 0000000000..388d22b8bb --- /dev/null +++ b/test/integration/connect/envoy/case-cross-peers/vars.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +export REQUIRED_SERVICES="s1 s1-sidecar-proxy gateway-primary s2-alpha s2-sidecar-proxy-alpha gateway-alpha tcpdump-primary tcpdump-alpha" +export REQUIRE_PEERS=1 diff --git a/test/integration/connect/envoy/defaults.sh b/test/integration/connect/envoy/defaults.sh index ccf85824d6..d6bdc8f456 100644 --- a/test/integration/connect/envoy/defaults.sh +++ b/test/integration/connect/envoy/defaults.sh @@ -3,4 +3,5 @@ export DEFAULT_REQUIRED_SERVICES="s1 s1-sidecar-proxy s2 s2-sidecar-proxy" export REQUIRED_SERVICES="${DEFAULT_REQUIRED_SERVICES}" export REQUIRE_SECONDARY=0 -export REQUIRE_PARTITIONS=0 \ No newline at end of file +export REQUIRE_PARTITIONS=0 +export REQUIRE_PEERS=0 diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index 5e27cc6e28..eb3a4ab254 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -357,7 +357,6 @@ function get_upstream_endpoint_in_status_count { local HEALTH_STATUS=$3 run curl -s -f "http://${HOSTPORT}/clusters?format=json" [ "$status" -eq 0 ] - # echo "$output" >&3 echo "$output" | jq --raw-output " .cluster_statuses[] | select(.name|startswith(\"${CLUSTER_NAME}\")) @@ -477,8 +476,11 @@ function get_healthy_service_count { local SERVICE_NAME=$1 local DC=$2 local NS=$3 + local AP=$4 + local PEER_NAME=$5 + + run curl -s -f ${HEADERS} "consul-${DC}:8500/v1/health/connect/${SERVICE_NAME}?passing&ns=${NS}&partition=${AP}&peer=${PEER_NAME}" - run curl -s -f ${HEADERS} "127.0.0.1:8500/v1/health/connect/${SERVICE_NAME}?dc=${DC}&passing&ns=${NS}" [ "$status" -eq 0 ] echo "$output" | jq --raw-output '. | length' } @@ -508,9 +510,11 @@ function assert_service_has_healthy_instances_once { local SERVICE_NAME=$1 local EXPECT_COUNT=$2 local DC=${3:-primary} - local NS=$4 + local NS=${4:-} + local AP=${5:-} + local PEER_NAME=${6:-} - GOT_COUNT=$(get_healthy_service_count "$SERVICE_NAME" "$DC" "$NS") + GOT_COUNT=$(get_healthy_service_count "$SERVICE_NAME" "$DC" "$NS" "$AP" "$PEER_NAME") [ "$GOT_COUNT" -eq $EXPECT_COUNT ] } @@ -519,9 +523,11 @@ function assert_service_has_healthy_instances { local SERVICE_NAME=$1 local EXPECT_COUNT=$2 local DC=${3:-primary} - local NS=$4 + local NS=${4:-} + local AP=${5:-} + local PEER_NAME=${6:-} - run retry_long assert_service_has_healthy_instances_once "$SERVICE_NAME" "$EXPECT_COUNT" "$DC" "$NS" + run retry_long assert_service_has_healthy_instances_once "$SERVICE_NAME" "$EXPECT_COUNT" "$DC" "$NS" "$AP" "$PEER_NAME" [ "$status" -eq 0 ] } @@ -941,3 +947,20 @@ function assert_expected_fortio_host_header { return 1 fi } + +function create_peering { + local GENERATE_PEER=$1 + local INITIATE_PEER=$2 + run curl -sL -XPOST "http://consul-${GENERATE_PEER}:8500/v1/peering/token" -d"{ \"PeerName\" : \"${GENERATE_PEER}-to-${INITIATE_PEER}\" }" + # echo "$output" >&3 + [ "$status" == 0 ] + + local token + token="$(echo "$output" | jq -r .PeeringToken)" + [ -n "$token" ] + + run curl -sLv -XPOST "http://consul-${INITIATE_PEER}:8500/v1/peering/initiate" -d"{ \"PeerName\" : \"${INITIATE_PEER}-to-${GENERATE_PEER}\", \"PeeringToken\" : \"${token}\" }" + # echo "$output" >&3 + [ "$status" == 0 ] +} + diff --git a/test/integration/connect/envoy/run-tests.sh b/test/integration/connect/envoy/run-tests.sh index 07ba6a19eb..5f83e804de 100755 --- a/test/integration/connect/envoy/run-tests.sh +++ b/test/integration/connect/envoy/run-tests.sh @@ -127,13 +127,21 @@ function start_consul { '-p=9411:9411' '-p=16686:16686' ) - if [[ $DC == 'secondary' ]]; then + case "$DC" in + secondary) ports=( '-p=9500:8500' '-p=9502:8502' ) - fi - + ;; + alpha) + ports=( + '-p=9510:8500' + '-p=9512:8502' + ) + ;; + esac + license="${CONSUL_LICENSE:-}" # load the consul license so we can pass it into the consul # containers as an env var in the case that this is a consul @@ -269,7 +277,10 @@ function capture_logs { then services="$services consul-ap1" fi - + if is_set $REQUIRE_PEERS + then + services="$services consul-alpha" + fi if [ -f "${CASE_DIR}/capture.sh" ] then @@ -289,7 +300,7 @@ function stop_services { # Teardown docker_kill_rm $REQUIRED_SERVICES - docker_kill_rm consul-primary consul-secondary consul-ap1 + docker_kill_rm consul-primary consul-secondary consul-ap1 consul-alpha } function init_vars { @@ -332,6 +343,10 @@ function run_tests { then init_workdir ap1 fi + if is_set $REQUIRE_PEERS + then + init_workdir alpha + fi global_setup @@ -357,6 +372,9 @@ function run_tests { docker_consul "primary" consul partition create -name ap1 > /dev/null start_partitioned_client ap1 fi + if is_set $REQUIRE_PEERS; then + start_consul alpha + fi echo "Setting up the primary datacenter" pre_service_setup primary @@ -369,6 +387,10 @@ function run_tests { echo "Setting up the non-default partition" pre_service_setup ap1 fi + if is_set $REQUIRE_PEERS; then + echo "Setting up the alpha peer" + pre_service_setup alpha + fi echo "Starting services" start_services @@ -381,6 +403,10 @@ function run_tests { echo "Verifying the secondary datacenter" verify secondary fi + if is_set $REQUIRE_PEERS; then + echo "Verifying the alpha peer" + verify alpha + fi } function test_teardown { @@ -435,13 +461,13 @@ function suite_setup { } function suite_teardown { - docker_kill_rm verify-primary verify-secondary + docker_kill_rm verify-primary verify-secondary verify-alpha # this is some hilarious magic docker_kill_rm $(grep "^function run_container_" $self_name | \ sed 's/^function run_container_\(.*\) {/\1/g') - docker_kill_rm consul-primary consul-secondary consul-ap1 + docker_kill_rm consul-primary consul-secondary consul-ap1 consul-alpha if docker network inspect envoy-tests &>/dev/null ; then echo -n "Deleting network 'envoy-tests'..." @@ -530,6 +556,14 @@ function run_container_s3-ap1 { common_run_container_service s3 ap1 8580 8579 } +function run_container_s1-alpha { + common_run_container_service s1-alpha alpha 8080 8079 +} + +function run_container_s2-alpha { + common_run_container_service s2-alpha alpha 8181 8179 +} + function common_run_container_sidecar_proxy { local service="$1" local CLUSTER="$2" @@ -544,7 +578,7 @@ function common_run_container_sidecar_proxy { "${HASHICORP_DOCKER_PROXY}/envoyproxy/envoy:v${ENVOY_VERSION}" \ envoy \ -c /workdir/${CLUSTER}/envoy/${service}-bootstrap.json \ - -l debug \ + -l trace \ --disable-hot-restart \ --drain-time-s 1 >/dev/null } @@ -564,7 +598,7 @@ function run_container_s1-sidecar-proxy-consul-exec { consul connect envoy -sidecar-for s1 \ -envoy-version ${ENVOY_VERSION} \ -- \ - -l debug >/dev/null + -l trace >/dev/null } function run_container_s2-sidecar-proxy { @@ -606,6 +640,13 @@ function run_container_s3-ap1-sidecar-proxy { common_run_container_sidecar_proxy s3 ap1 } +function run_container_s1-sidecar-proxy-alpha { + common_run_container_sidecar_proxy s1 alpha +} +function run_container_s2-sidecar-proxy-alpha { + common_run_container_sidecar_proxy s2 alpha +} + function common_run_container_gateway { local name="$1" local DC="$2" @@ -620,7 +661,7 @@ function common_run_container_gateway { "${HASHICORP_DOCKER_PROXY}/envoyproxy/envoy:v${ENVOY_VERSION}" \ envoy \ -c /workdir/${DC}/envoy/${name}-bootstrap.json \ - -l debug \ + -l trace \ --disable-hot-restart \ --drain-time-s 1 >/dev/null } @@ -631,6 +672,9 @@ function run_container_gateway-primary { function run_container_gateway-secondary { common_run_container_gateway mesh-gateway secondary } +function run_container_gateway-alpha { + common_run_container_gateway mesh-gateway alpha +} function run_container_ingress-gateway-primary { common_run_container_gateway ingress-gateway primary @@ -699,6 +743,10 @@ function run_container_tcpdump-secondary { # To use add "tcpdump-secondary" to REQUIRED_SERVICES common_run_container_tcpdump secondary } +function run_container_tcpdump-alpha { + # To use add "tcpdump-alpha" to REQUIRED_SERVICES + common_run_container_tcpdump alpha +} function common_run_container_tcpdump { local DC="$1"