From 4b85aa5a97abe1ec018baefec3b6d8dab824de34 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 2 Nov 2023 16:13:16 -0500 Subject: [PATCH] testing/deployer: support tproxy in v2 for dataplane (#19094) This updates the testing/deployer (aka "topology test") framework to allow for a v2-oriented topology to opt services into enabling TransparentProxy. The restrictions are similar to that of #19046 The multiport Ports map that was added in #19046 was changed to allow for the protocol to be specified at this time, but for now the only supported protocol is TCP as only L4 functions currently on main. As part of making transparent proxy work, the DNS server needed a new zonefile for responding to virtual.consul requests, since there is no Kubernetes DNS and the Consul DNS work for v2 has not happened yet. Once Consul DNS supports v2 we should switch over. For now the format of queries is: ----.virtual.consul Additionally: - All transparent proxy enabled services are assigned a virtual ip in the 10.244.0/24 range. This is something Consul will do in v2 at a later date, likely during 1.18. - All services with exposed ports (non-mesh) are assigned a virtual port number for use with tproxy - The consul-dataplane image has been made un-distroless, and gotten the necessary tools to execute consul connect redirect-traffic before running dataplane, thus simulating a kubernetes init container in plain docker. --- .../catalogv2/explicit_destinations_test.go | 28 +-- test-integ/catalogv2/helpers_test.go | 22 ++ .../catalogv2/implicit_destinations_test.go | 214 ++++++++++++++++++ .../ac3_service_defaults_upstream_test.go | 11 +- .../ac4_proxy_defaults_test.go | 4 +- test-integ/topoutil/asserter.go | 25 +- test-integ/topoutil/fixtures.go | 12 +- .../consul-container/libs/assert/envoy.go | 2 +- .../consul-container/libs/assert/service.go | 1 - testing/deployer/sprawl/catalog.go | 67 ++++-- testing/deployer/sprawl/details.go | 2 +- .../deployer/sprawl/internal/build/docker.go | 77 +++++++ testing/deployer/sprawl/internal/tfgen/dns.go | 95 +++++++- testing/deployer/sprawl/internal/tfgen/gen.go | 7 +- .../deployer/sprawl/internal/tfgen/nodes.go | 6 +- .../templates/container-app-dataplane.tf.tmpl | 11 + testing/deployer/topology/compile.go | 171 ++++++++++++-- testing/deployer/topology/images.go | 20 +- testing/deployer/topology/relationships.go | 19 +- testing/deployer/topology/topology.go | 140 +++++++++--- 20 files changed, 810 insertions(+), 124 deletions(-) create mode 100644 test-integ/catalogv2/helpers_test.go create mode 100644 test-integ/catalogv2/implicit_destinations_test.go diff --git a/test-integ/catalogv2/explicit_destinations_test.go b/test-integ/catalogv2/explicit_destinations_test.go index a62d98b7c6..9c86e05952 100644 --- a/test-integ/catalogv2/explicit_destinations_test.go +++ b/test-integ/catalogv2/explicit_destinations_test.go @@ -5,7 +5,6 @@ package catalogv2 import ( "fmt" - "strings" "testing" pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" @@ -18,7 +17,7 @@ import ( "github.com/hashicorp/consul/test-integ/topoutil" ) -// TestBasicL4ExplicitDestination sets up the following: +// TestBasicL4ExplicitDestinations sets up the following: // // - 1 cluster (no peering / no wanfed) // - 3 servers in that cluster @@ -37,8 +36,8 @@ import ( // - part1/default // - default/nsa // - part1/nsa -func TestBasicL4ExplicitDestination(t *testing.T) { - cfg := testBasicL4ExplicitDestinationCreator{}.NewConfig(t) +func TestBasicL4ExplicitDestinations(t *testing.T) { + cfg := testBasicL4ExplicitDestinationsCreator{}.NewConfig(t) sp := sprawltest.Launch(t, cfg) @@ -69,20 +68,11 @@ func TestBasicL4ExplicitDestination(t *testing.T) { for _, ship := range ships { t.Run("relationship: "+ship.String(), func(t *testing.T) { var ( - svc = ship.Caller - u = ship.Upstream - clusterPrefix string + svc = ship.Caller + u = ship.Upstream ) - if u.Peer == "" { - if u.ID.PartitionOrDefault() == "default" { - clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".") - } else { - clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".") - } - } else { - clusterPrefix = strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") - } + clusterPrefix := clusterPrefixForUpstream(u) asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1) asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "") @@ -91,9 +81,9 @@ func TestBasicL4ExplicitDestination(t *testing.T) { } } -type testBasicL4ExplicitDestinationCreator struct{} +type testBasicL4ExplicitDestinationsCreator struct{} -func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology.Config { +func (c testBasicL4ExplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config { const clusterName = "dc1" servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil) @@ -129,7 +119,7 @@ func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology } } -func (c testBasicL4ExplicitDestinationCreator) topologyConfigAddNodes( +func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes( t *testing.T, cluster *topology.Cluster, nodeName func() string, diff --git a/test-integ/catalogv2/helpers_test.go b/test-integ/catalogv2/helpers_test.go new file mode 100644 index 0000000000..f5e352779a --- /dev/null +++ b/test-integ/catalogv2/helpers_test.go @@ -0,0 +1,22 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package catalogv2 + +import ( + "strings" + + "github.com/hashicorp/consul/testing/deployer/topology" +) + +func clusterPrefixForUpstream(u *topology.Upstream) string { + if u.Peer == "" { + if u.ID.PartitionOrDefault() == "default" { + return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".") + } else { + return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".") + } + } else { + return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") + } +} diff --git a/test-integ/catalogv2/implicit_destinations_test.go b/test-integ/catalogv2/implicit_destinations_test.go new file mode 100644 index 0000000000..daf19945a9 --- /dev/null +++ b/test-integ/catalogv2/implicit_destinations_test.go @@ -0,0 +1,214 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package catalogv2 + +import ( + "fmt" + "testing" + + pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest" + "github.com/hashicorp/consul/testing/deployer/topology" + + "github.com/hashicorp/consul/test-integ/topoutil" +) + +// TestBasicL4ImplicitDestinations sets up the following: +// +// - 1 cluster (no peering / no wanfed) +// - 3 servers in that cluster +// - v2 arch is activated +// - for each tenancy, only using v2 constructs: +// - a server exposing 2 tcp ports +// - a client with transparent proxy enabled and no explicit upstreams +// - a traffic permission granting the client access to the service on all ports +// +// When this test is executed in CE it will only use the default/default +// tenancy. +// +// When this test is executed in Enterprise it will additionally test the same +// things within these tenancies: +// +// - part1/default +// - default/nsa +// - part1/nsa +func TestBasicL4ImplicitDestinations(t *testing.T) { + cfg := testBasicL4ImplicitDestinationsCreator{}.NewConfig(t) + + sp := sprawltest.Launch(t, cfg) + + var ( + asserter = topoutil.NewAsserter(sp) + + topo = sp.Topology() + cluster = topo.Clusters["dc1"] + + ships = topo.ComputeRelationships() + ) + + clientV2 := sp.ResourceServiceClientForCluster(cluster.Name) + + t.Log(topology.RenderRelationships(ships)) + + // Make sure things are truly in v2 not v1. + for _, name := range []string{ + "static-server", + "static-client", + } { + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, name, nil, 1) + } + + // Check relationships + for _, ship := range ships { + t.Run("relationship: "+ship.String(), func(t *testing.T) { + var ( + svc = ship.Caller + u = ship.Upstream + ) + + clusterPrefix := clusterPrefixForUpstream(u) + + asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1) + if u.LocalPort > 0 { + asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "") + } + asserter.FortioFetch2FortioName(t, svc, u, cluster.Name, u.ID) + }) + } +} + +type testBasicL4ImplicitDestinationsCreator struct{} + +func (c testBasicL4ImplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config { + const clusterName = "dc1" + + servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil) + + cluster := &topology.Cluster{ + Enterprise: utils.IsEnterprise(), + Name: clusterName, + Nodes: servers, + } + + lastNode := 0 + nodeName := func() string { + lastNode++ + return fmt.Sprintf("%s-box%d", clusterName, lastNode) + } + + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "default") + if cluster.Enterprise { + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "default") + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "nsa") + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "nsa") + } + + return &topology.Config{ + Images: topoutil.TargetImages(), + Networks: []*topology.Network{ + {Name: clusterName}, + {Name: "wan", Type: "wan"}, + }, + Clusters: []*topology.Cluster{ + cluster, + }, + } +} + +func (c testBasicL4ImplicitDestinationsCreator) topologyConfigAddNodes( + t *testing.T, + cluster *topology.Cluster, + nodeName func() string, + partition, + namespace string, +) { + clusterName := cluster.Name + + newServiceID := func(name string) topology.ServiceID { + return topology.ServiceID{ + Partition: partition, + Namespace: namespace, + Name: name, + } + } + + tenancy := &pbresource.Tenancy{ + Partition: partition, + Namespace: namespace, + PeerName: "local", + } + + serverNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewFortioServiceWithDefaults( + clusterName, + newServiceID("static-server"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.EnableTransparentProxy = true + }, + ), + }, + } + clientNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewFortioServiceWithDefaults( + clusterName, + newServiceID("static-client"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.EnableTransparentProxy = true + svc.ImpliedUpstreams = []*topology.Upstream{ + { + ID: newServiceID("static-server"), + PortName: "http", + }, + { + ID: newServiceID("static-server"), + PortName: "http-alt", + }, + } + }, + ), + }, + } + trafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbauth.TrafficPermissionsType, + Name: "static-server-perms", + Tenancy: tenancy, + }, + }, &pbauth.TrafficPermissions{ + Destination: &pbauth.Destination{ + IdentityName: "static-server", + }, + Action: pbauth.Action_ACTION_ALLOW, + Permissions: []*pbauth.Permission{{ + Sources: []*pbauth.Source{{ + IdentityName: "static-client", + Namespace: namespace, + }}, + }}, + }) + + cluster.Nodes = append(cluster.Nodes, + clientNode, + serverNode, + ) + + cluster.InitialResources = append(cluster.InitialResources, + trafficPerms, + ) +} diff --git a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go index 586103c111..b8c7b83f2a 100644 --- a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go +++ b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go @@ -11,14 +11,13 @@ import ( "testing" "time" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil/retry" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" "github.com/hashicorp/consul/testing/deployer/topology" "github.com/hashicorp/go-cleanhttp" "github.com/itchyny/gojq" "github.com/stretchr/testify/require" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil/retry" - libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" ) var ac3SvcDefaultsSuites []sharedTopoSuite = []sharedTopoSuite{ @@ -185,7 +184,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) { // TODO: what is default? namespace? partition? clusterName := fmt.Sprintf("%s.default.%s.external", s.upstream.ID.Name, s.upstream.Peer) nonceStatus := http.StatusInsufficientStorage - url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort, + url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/?status=%d", s.upstream.LocalPort, nonceStatus)), ) @@ -221,7 +220,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) { require.True(r, resultAsBool) }) - url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort, + url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)), ) retry.RunWith(&retry.Timer{Timeout: time.Minute * 1, Wait: time.Millisecond * 500}, t, func(r *retry.R) { diff --git a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go index c413820c6f..a19782bbab 100644 --- a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go +++ b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go @@ -180,11 +180,11 @@ func (s *ac4ProxyDefaultsSuite) test(t *testing.T, ct *commonTopo) { }) t.Run("HTTP service fails due to connection timeout", func(t *testing.T) { - url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort, + url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/?delay=1000ms", s.upstream.LocalPort)), ) - url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort, + url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)), ) diff --git a/test-integ/topoutil/asserter.go b/test-integ/topoutil/asserter.go index 3795e18794..dcf9da52d0 100644 --- a/test-integ/topoutil/asserter.go +++ b/test-integ/topoutil/asserter.go @@ -234,10 +234,21 @@ func (a *Asserter) fortioFetch2Upstream( ) (body []byte, res *http.Response) { t.Helper() - // TODO: fortioSvc.ID.Normalize()? or should that be up to the caller? + var actualURL string + if upstream.Implied { + actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s", + upstream.ID.Name, + upstream.ID.Namespace, + upstream.ID.Partition, + upstream.VirtualPort, + path, + ) + } else { + actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path) + } url := fmt.Sprintf("http://%s/fortio/fetch2?url=%s", addr, - url.QueryEscape(fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)), + url.QueryEscape(actualURL), ) req, err := http.NewRequest(http.MethodPost, url, nil) @@ -246,6 +257,7 @@ func (a *Asserter) fortioFetch2Upstream( res, err = client.Do(req) require.NoError(t, err) defer res.Body.Close() + // not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode) require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode) @@ -281,7 +293,13 @@ func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioSvc *topology.Serv // similar to libassert.AssertFortioName, // uses the /fortio/fetch2 endpoint to hit the debug endpoint on the upstream, // and assert that the FORTIO_NAME == name -func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Service, upstream *topology.Upstream, clusterName string, sid topology.ServiceID) { +func (a *Asserter) FortioFetch2FortioName( + t *testing.T, + fortioSvc *topology.Service, + upstream *topology.Upstream, + clusterName string, + sid topology.ServiceID, +) { t.Helper() var ( @@ -295,6 +313,7 @@ func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Serv retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) { body, res := a.fortioFetch2Upstream(r, client, addr, upstream, path) + require.Equal(r, http.StatusOK, res.StatusCode) // TODO: not sure we should retry these? diff --git a/test-integ/topoutil/fixtures.go b/test-integ/topoutil/fixtures.go index 18742dd2c0..c8e9afbe10 100644 --- a/test-integ/topoutil/fixtures.go +++ b/test-integ/topoutil/fixtures.go @@ -41,10 +41,14 @@ func NewFortioServiceWithDefaults( } if nodeVersion == topology.NodeVersionV2 { - svc.Ports = map[string]int{ - "http": httpPort, - "http-alt": httpPort, - "grpc": grpcPort, + svc.Ports = map[string]*topology.Port{ + // TODO(rb/v2): once L7 works in v2 switch these back + "http": {Number: httpPort, Protocol: "tcp"}, + "http-alt": {Number: httpPort, Protocol: "tcp"}, + "grpc": {Number: grpcPort, Protocol: "tcp"}, + // "http": {Number: httpPort, Protocol: "http"}, + // "http-alt": {Number: httpPort, Protocol: "http"}, + // "grpc": {Number: grpcPort, Protocol: "grpc"}, } } else { svc.Port = httpPort diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 35f9873741..076f2e1af6 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -118,7 +118,7 @@ func AssertUpstreamEndpointStatusWithClient( clusterName, healthStatus) results, err := utils.JQFilter(clusters, filter) require.NoErrorf(r, err, "could not find cluster name %q: %v \n%s", clusterName, err, clusters) - require.Len(r, results, 1) // the final part of the pipeline is "length" which only ever returns 1 result + require.Len(r, results, 1, "clusters: "+clusters) // the final part of the pipeline is "length" which only ever returns 1 result result, err := strconv.Atoi(results[0]) assert.NoError(r, err) diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index 72e8ef06e7..7434a1d5e3 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -63,7 +63,6 @@ func CatalogV2ServiceDoesNotExist(t *testing.T, client pbresource.ResourceServic // number of workload endpoints. func CatalogV2ServiceHasEndpointCount(t *testing.T, client pbresource.ResourceServiceClient, svc string, tenancy *pbresource.Tenancy, count int) { t.Helper() - require.False(t, count == 0) ctx := testutil.TestContext(t) retry.Run(t, func(r *retry.R) { diff --git a/testing/deployer/sprawl/catalog.go b/testing/deployer/sprawl/catalog.go index 75a4fb6fd6..b19626ba7d 100644 --- a/testing/deployer/sprawl/catalog.go +++ b/testing/deployer/sprawl/catalog.go @@ -196,8 +196,9 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster if node.IsV2() { pending := serviceInstanceToResources(node, svc) - if _, ok := identityInfo[svc.ID]; !ok { - identityInfo[svc.ID] = pending.WorkloadIdentity + workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition) + if _, ok := identityInfo[workloadID]; !ok { + identityInfo[workloadID] = pending.WorkloadIdentity } // Write workload @@ -230,6 +231,15 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster return err } } + if pending.ProxyConfiguration != nil { + res, err := pending.ProxyConfiguration.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } } else { if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil { return fmt.Errorf("error registering service: %w", err) @@ -268,6 +278,7 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster }, Data: svcData, } + res, err := svcInfo.Build() if err != nil { return fmt.Errorf("error serializing resource %s: %w", util.IDToString(svcInfo.Resource.Id), err) @@ -482,10 +493,11 @@ func (r *Resource[V]) Build() (*pbresource.Resource, error) { } type ServiceResources struct { - Workload *Resource[*pbcatalog.Workload] - HealthStatuses []*Resource[*pbcatalog.HealthStatus] - Destinations *Resource[*pbmesh.Destinations] - WorkloadIdentity *Resource[*pbauth.WorkloadIdentity] + Workload *Resource[*pbcatalog.Workload] + HealthStatuses []*Resource[*pbcatalog.HealthStatus] + Destinations *Resource[*pbmesh.Destinations] + WorkloadIdentity *Resource[*pbauth.WorkloadIdentity] + ProxyConfiguration *Resource[*pbmesh.ProxyConfiguration] } func serviceInstanceToResources( @@ -506,8 +518,8 @@ func serviceInstanceToResources( ) for name, port := range svc.Ports { wlPorts[name] = &pbcatalog.WorkloadPort{ - Port: uint32(port), - Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + Port: uint32(port.Number), + Protocol: port.ActualProtocol, } } @@ -534,21 +546,20 @@ func serviceInstanceToResources( }, }, } - - worloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{ + workloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{ Resource: &pbresource.Resource{ Id: &pbresource.ID{ Type: pbauth.WorkloadIdentityType, - Name: svc.ID.Name, + Name: svc.WorkloadIdentity, Tenancy: tenancy, }, - Metadata: svc.Meta, }, Data: &pbauth.WorkloadIdentity{}, } healthResList []*Resource[*pbcatalog.HealthStatus] destinationsRes *Resource[*pbmesh.Destinations] + proxyConfigRes *Resource[*pbmesh.ProxyConfiguration] ) if svc.HasCheck() { @@ -577,11 +588,6 @@ func serviceInstanceToResources( } if !svc.DisableServiceMesh { - workloadRes.Data.Ports["mesh"] = &pbcatalog.WorkloadPort{ - Port: uint32(svc.EnvoyPublicListenerPort), - Protocol: pbcatalog.Protocol_PROTOCOL_MESH, - } - destinationsRes = &Resource[*pbmesh.Destinations]{ Resource: &pbresource.Resource{ Id: &pbresource.ID{ @@ -615,13 +621,32 @@ func serviceInstanceToResources( } destinationsRes.Data.Destinations = append(destinationsRes.Data.Destinations, dest) } + + if svc.EnableTransparentProxy { + proxyConfigRes = &Resource[*pbmesh.ProxyConfiguration]{ + Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbmesh.ProxyConfigurationType, + Name: svc.Workload, + Tenancy: tenancy, + }, + }, + Data: &pbmesh.ProxyConfiguration{ + Workloads: selector, + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + }, + }, + } + } } return &ServiceResources{ - Workload: workloadRes, - HealthStatuses: healthResList, - Destinations: destinationsRes, - WorkloadIdentity: worloadIdentityRes, + Workload: workloadRes, + HealthStatuses: healthResList, + Destinations: destinationsRes, + WorkloadIdentity: workloadIdentityRes, + ProxyConfiguration: proxyConfigRes, } } diff --git a/testing/deployer/sprawl/details.go b/testing/deployer/sprawl/details.go index b12fa02e5a..a0c6c0c2a5 100644 --- a/testing/deployer/sprawl/details.go +++ b/testing/deployer/sprawl/details.go @@ -72,7 +72,7 @@ func (s *Sprawl) PrintDetails() error { } else { ports := make(map[string]int) for name, port := range svc.Ports { - ports[name] = node.ExposedPort(port) + ports[name] = node.ExposedPort(port.Number) } cd.Apps = append(cd.Apps, appDetail{ Type: "app", diff --git a/testing/deployer/sprawl/internal/build/docker.go b/testing/deployer/sprawl/internal/build/docker.go index ac1976dad4..b8ca695f9b 100644 --- a/testing/deployer/sprawl/internal/build/docker.go +++ b/testing/deployer/sprawl/internal/build/docker.go @@ -35,6 +35,64 @@ USER 100:0 ENTRYPOINT [] ` +const dockerfileDataplaneForTProxy = ` +ARG DATAPLANE_IMAGE +ARG CONSUL_IMAGE +FROM ${CONSUL_IMAGE} AS consul +FROM ${DATAPLANE_IMAGE} AS distroless +FROM debian:bullseye-slim + +# undo the distroless aspect +COPY --from=distroless /usr/local/bin/discover /usr/local/bin/ +COPY --from=distroless /usr/local/bin/envoy /usr/local/bin/ +COPY --from=distroless /usr/local/bin/consul-dataplane /usr/local/bin/ +COPY --from=distroless /licenses/copyright.txt /licenses/ + +COPY --from=consul /bin/consul /bin/ + +# Install iptables and sudo, needed for tproxy. +RUN apt update -y \ + && apt install -y iptables sudo curl dnsutils + +RUN sed '/_apt/d' /etc/passwd > /etc/passwd.new \ + && mv -f /etc/passwd.new /etc/passwd \ + && adduser --uid=100 consul --no-create-home --disabled-password --system \ + && adduser consul sudo \ + && echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +COPY <<'EOF' /bin/tproxy-startup.sh +#!/bin/sh + +set -ex + +# HACK: UID of consul in the consul-client container +# This is conveniently also the UID of apt in the envoy container +CONSUL_UID=100 +ENVOY_UID=$(id -u) + +# - We allow 19000 so that the test can directly visit the envoy admin page. +# - We allow 20000 so that envoy can receive mTLS traffic from other nodes. +# - We (reluctantly) allow 8080 so that we can bypass envoy and talk to fortio +# to do test assertions. +sudo consul connect redirect-traffic \ + -proxy-uid $ENVOY_UID \ + -exclude-uid $CONSUL_UID \ + -proxy-inbound-port=15001 \ + -exclude-inbound-port=19000 \ + -exclude-inbound-port=20000 \ + -exclude-inbound-port=8080 +exec "$@" +EOF + +RUN chmod +x /bin/tproxy-startup.sh \ + && chown 100:0 /bin/tproxy-startup.sh + +RUN echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +USER 100:0 +ENTRYPOINT [] +` + func DockerImages( logger hclog.Logger, run *runner.Runner, @@ -80,6 +138,25 @@ func DockerImages( built[cdp] = struct{}{} } + + cdpTproxy := n.Images.LocalDataplaneTProxyImage() + if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok { + logger.Info("building image", "image", cdpTproxy) + err := run.DockerExec(context.TODO(), []string{ + "build", + "--build-arg", + "DATAPLANE_IMAGE=" + n.Images.Dataplane, + "--build-arg", + "CONSUL_IMAGE=" + n.Images.Consul, + "-t", cdpTproxy, + "-", + }, logw, strings.NewReader(dockerfileDataplaneForTProxy)) + if err != nil { + return err + } + + built[cdpTproxy] = struct{}{} + } } } diff --git a/testing/deployer/sprawl/internal/tfgen/dns.go b/testing/deployer/sprawl/internal/tfgen/dns.go index 20dca878eb..9b03693c83 100644 --- a/testing/deployer/sprawl/internal/tfgen/dns.go +++ b/testing/deployer/sprawl/internal/tfgen/dns.go @@ -8,8 +8,11 @@ import ( "fmt" "os" "path/filepath" + "sort" "strings" + "golang.org/x/exp/maps" + "github.com/hashicorp/consul/testing/deployer/topology" "github.com/hashicorp/consul/testing/deployer/util" ) @@ -63,17 +66,36 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string } } + // Until Consul DNS understands v2, simulate it. + // + // NOTE: this DNS is not quite what consul normally does. It's simpler + // to simulate this format here. + virtualNames := make(map[string][]string) + for id, svcData := range cluster.Services { + if len(svcData.VirtualIps) == 0 { + continue + } + vips := svcData.VirtualIps + + // ----.virtual. + name := fmt.Sprintf("%s--%s--%s", id.Name, id.Namespace, id.Partition) + virtualNames[name] = vips + } + var ( clusterDNSName = cluster.Name + "-consulcluster.lan" - ) + virtualDNSName = "virtual.consul" - corefilePath := filepath.Join(rootdir, "Corefile") - zonefilePath := filepath.Join(rootdir, "servers") + corefilePath = filepath.Join(rootdir, "Corefile") + zonefilePath = filepath.Join(rootdir, "servers") + virtualZonefilePath = filepath.Join(rootdir, "virtual") + ) _, err := UpdateFileIfDifferent( g.logger, generateCoreDNSConfigFile( clusterDNSName, + virtualDNSName, addrs, ), corefilePath, @@ -105,7 +127,25 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string return false, nil, fmt.Errorf("error hashing %q: %w", zonefilePath, err) } - return true, []string{corefileHash, zonefileHash}, nil + _, err = UpdateFileIfDifferent( + g.logger, + generateCoreDNSVirtualZoneFile( + dnsIPAddress, + virtualDNSName, + virtualNames, + ), + virtualZonefilePath, + 0644, + ) + if err != nil { + return false, nil, fmt.Errorf("error writing %q: %w", virtualZonefilePath, err) + } + virtualZonefileHash, err := util.HashFile(virtualZonefilePath) + if err != nil { + return false, nil, fmt.Errorf("error hashing %q: %w", virtualZonefilePath, err) + } + + return true, []string{corefileHash, zonefileHash, virtualZonefileHash}, nil } return false, nil, nil @@ -113,6 +153,7 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string func generateCoreDNSConfigFile( clusterDNSName string, + virtualDNSName string, addrs []string, ) []byte { serverPart := "" @@ -139,7 +180,14 @@ consul:53 { whoami } -%[2]s +%[2]s:53 { + file /config/virtual %[2]s + log + errors + whoami +} + +%[3]s .:53 { forward . 8.8.8.8:53 @@ -147,7 +195,7 @@ consul:53 { errors whoami } -`, clusterDNSName, serverPart)) +`, clusterDNSName, virtualDNSName, serverPart)) } func generateCoreDNSZoneFile( @@ -178,3 +226,38 @@ server IN A %s ; Consul server return buf.Bytes() } + +func generateCoreDNSVirtualZoneFile( + dnsIPAddress string, + virtualDNSName string, + nameToAddr map[string][]string, +) []byte { + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf(` +$TTL 60 +$ORIGIN %[1]s. +@ IN SOA ns.%[1]s. webmaster.%[1]s. ( + 2017042745 ; serial + 7200 ; refresh (2 hours) + 3600 ; retry (1 hour) + 1209600 ; expire (2 weeks) + 3600 ; minimum (1 hour) + ) +@ IN NS ns.%[1]s. ; Name server +ns IN A %[2]s ; self +`, virtualDNSName, dnsIPAddress)) + + names := maps.Keys(nameToAddr) + sort.Strings(names) + + for _, name := range names { + vips := nameToAddr[name] + for _, vip := range vips { + buf.WriteString(fmt.Sprintf(` +%s IN A %s ; Consul server +`, name, vip)) + } + } + + return buf.Bytes() +} diff --git a/testing/deployer/sprawl/internal/tfgen/gen.go b/testing/deployer/sprawl/internal/tfgen/gen.go index f64d7b6a25..0f38714b0c 100644 --- a/testing/deployer/sprawl/internal/tfgen/gen.go +++ b/testing/deployer/sprawl/internal/tfgen/gen.go @@ -122,8 +122,10 @@ func (s Step) String() string { } } -func (s Step) StartServers() bool { return s >= StepServers } -func (s Step) StartAgents() bool { return s >= StepAgents } +func (s Step) StartServers() bool { return s >= StepServers } + +func (s Step) StartAgents() bool { return s >= StepAgents } + func (s Step) StartServices() bool { return s >= StepServices } // func (s Step) InitiatePeering() bool { return s >= StepPeering } @@ -260,6 +262,7 @@ func (g *Generator) Generate(step Step) error { addImage("", node.Images.Consul) addImage("", node.Images.EnvoyConsulImage()) addImage("", node.Images.LocalDataplaneImage()) + addImage("", node.Images.LocalDataplaneTProxyImage()) if node.IsAgent() { addVolume(node.DockerName()) diff --git a/testing/deployer/sprawl/internal/tfgen/nodes.go b/testing/deployer/sprawl/internal/tfgen/nodes.go index 6f105b6f5a..4934482d72 100644 --- a/testing/deployer/sprawl/internal/tfgen/nodes.go +++ b/testing/deployer/sprawl/internal/tfgen/nodes.go @@ -125,7 +125,11 @@ func (g *Generator) generateNodeContainers( var img string if node.IsDataplane() { tmpl = tfAppDataplaneT - img = DockerImageResourceName(node.Images.LocalDataplaneImage()) + if svc.EnableTransparentProxy { + img = DockerImageResourceName(node.Images.LocalDataplaneTProxyImage()) + } else { + img = DockerImageResourceName(node.Images.LocalDataplaneImage()) + } } else { img = DockerImageResourceName(node.Images.EnvoyConsulImage()) } diff --git a/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl b/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl index fa44090c80..f706b6ad2d 100644 --- a/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl +++ b/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl @@ -17,6 +17,13 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec read_only = true } +{{ if .Service.EnableTransparentProxy }} + capabilities { + add = ["NET_ADMIN"] + } + entrypoint = [ "/bin/tproxy-startup.sh" ] +{{ end }} + env = [ "DP_CONSUL_ADDRESSES=server.{{.Node.Cluster}}-consulcluster.lan", {{ if .Node.IsV2 }} @@ -39,6 +46,10 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec "DP_CREDENTIAL_STATIC_TOKEN={{.Token}}", {{ end }} +{{ if .Service.EnableTransparentProxy }} + "REDIRECT_TRAFFIC_ARGS=-exclude-inbound-port=19000", +{{ end }} + // for demo purposes "DP_ENVOY_ADMIN_BIND_ADDRESS=0.0.0.0", "DP_ENVOY_ADMIN_BIND_PORT=19000", diff --git a/testing/deployer/topology/compile.go b/testing/deployer/topology/compile.go index c9249d43ed..beaace3e14 100644 --- a/testing/deployer/topology/compile.go +++ b/testing/deployer/topology/compile.go @@ -317,6 +317,13 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return nil, fmt.Errorf("cluster %q node %q has more than one public address", c.Name, n.Name) } + if n.IsDataplane() && len(n.Services) > 1 { + // Our use of consul-dataplane here is supposed to mimic that + // of consul-k8s, which ultimately has one IP per Service, so + // we introduce the same limitation here. + return nil, fmt.Errorf("cluster %q node %q uses dataplane, but has more than one service", c.Name, n.Name) + } + seenServices := make(map[ServiceID]struct{}) for _, svc := range n.Services { if n.IsAgent() { @@ -387,7 +394,7 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error // return nil, fmt.Errorf("service has invalid protocol: %s", svc.Protocol) // } - for _, u := range svc.Upstreams { + defaultUpstream := func(u *Upstream) error { // Default to that of the enclosing service. if u.Peer == "" { if u.ID.Partition == "" { @@ -406,17 +413,43 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error addTenancy(u.ID.Partition, u.ID.Namespace) - if u.LocalAddress == "" { - // v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though. - u.LocalAddress = "127.0.0.1" + if u.Implied { + if u.PortName == "" { + return fmt.Errorf("implicit upstreams must use port names in v2") + } + } else { + if u.LocalAddress == "" { + // v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though. + u.LocalAddress = "127.0.0.1" + } + if u.PortName != "" && n.IsV1() { + return fmt.Errorf("explicit upstreams cannot use port names in v1") + } + if u.PortName == "" && n.IsV2() { + // Assume this is a v1->v2 conversion and name it. + u.PortName = "legacy" + } } - if u.PortName != "" && n.IsV1() { - return nil, fmt.Errorf("explicit upstreams cannot use port names in v1") + return nil + } + + for _, u := range svc.Upstreams { + if err := defaultUpstream(u); err != nil { + return nil, err } - if u.PortName == "" && n.IsV2() { - // Assume this is a v1->v2 conversion and name it. - u.PortName = "legacy" + } + + if n.IsV2() { + for _, u := range svc.ImpliedUpstreams { + u.Implied = true + if err := defaultUpstream(u); err != nil { + return nil, err + } + } + } else { + if len(svc.ImpliedUpstreams) > 0 { + return nil, fmt.Errorf("v1 does not support implied upstreams yet") } } @@ -424,31 +457,36 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return nil, fmt.Errorf("cluster %q node %q service %q is not valid: %w", c.Name, n.Name, svc.ID.String(), err) } + if svc.EnableTransparentProxy && !n.IsDataplane() { + return nil, fmt.Errorf("cannot enable tproxy on a non-dataplane node") + } + if n.IsV2() { if implicitV2Services { svc.V2Services = []string{svc.ID.Name} var svcPorts []*pbcatalog.ServicePort - for name := range svc.Ports { + for name, cfg := range svc.Ports { svcPorts = append(svcPorts, &pbcatalog.ServicePort{ TargetPort: name, - Protocol: pbcatalog.Protocol_PROTOCOL_TCP, // TODO - }) - } - if !svc.DisableServiceMesh { - svcPorts = append(svcPorts, &pbcatalog.ServicePort{ - TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + Protocol: cfg.ActualProtocol, }) } v2svc := &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Names: []string{svc.Workload}, - }, - Ports: svcPorts, + Workloads: &pbcatalog.WorkloadSelector{}, + Ports: svcPorts, } - c.Services[svc.ID] = v2svc + prev, ok := c.Services[svc.ID] + if !ok { + c.Services[svc.ID] = v2svc + prev = v2svc + } + if prev.Workloads == nil { + prev.Workloads = &pbcatalog.WorkloadSelector{} + } + prev.Workloads.Names = append(prev.Workloads.Names, svc.Workload) } else { for _, name := range svc.V2Services { @@ -466,20 +504,45 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error } } - if len(svc.WorkloadIdentities) == 0 { - svc.WorkloadIdentities = []string{svc.ID.Name} + if svc.WorkloadIdentity == "" { + svc.WorkloadIdentity = svc.ID.Name } } else { if len(svc.V2Services) > 0 { return nil, fmt.Errorf("cannot specify v2 services for v1") } - if len(svc.WorkloadIdentities) > 0 { + if svc.WorkloadIdentity != "" { return nil, fmt.Errorf("cannot specify workload identities for v1") } } } } + if err := assignVirtualIPs(c); err != nil { + return nil, err + } + + if c.EnableV2 { + // Populate the VirtualPort field on all implied upstreams. + for _, n := range c.Nodes { + for _, svc := range n.Services { + for _, u := range svc.ImpliedUpstreams { + res, ok := c.Services[u.ID] + if ok { + for _, sp := range res.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.TargetPort == u.PortName { + u.VirtualPort = sp.VirtualPort + } + } + } + } + } + } + } + // Explode this into the explicit list based on stray references made. c.Partitions = nil for ap, nsMap := range tenancies { @@ -605,6 +668,21 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error // this helps in generating fortio assertions; otherwise field is ignored u.ID.Partition = remotePeer.Link.Partition } + for _, u := range svc.ImpliedUpstreams { + if u.Peer == "" { + u.Cluster = c.Name + u.Peering = nil + continue + } + remotePeer, ok := c.Peerings[u.Peer] + if !ok { + return nil, fmt.Errorf("not possible") + } + u.Cluster = remotePeer.Link.Name + u.Peering = remotePeer.Link + // this helps in generating fortio assertions; otherwise field is ignored + u.ID.Partition = remotePeer.Link.Partition + } } } } @@ -671,6 +749,51 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return t, nil } +func assignVirtualIPs(c *Cluster) error { + lastVIPIndex := 1 + for _, svcData := range c.Services { + lastVIPIndex++ + if lastVIPIndex > 250 { + return fmt.Errorf("too many ips using this approach to VIPs") + } + svcData.VirtualIps = []string{ + fmt.Sprintf("10.244.0.%d", lastVIPIndex), + } + + // populate virtual ports where we forgot them + var ( + usedPorts = make(map[uint32]struct{}) + next = uint32(8080) + ) + for _, sp := range svcData.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.VirtualPort > 0 { + usedPorts[sp.VirtualPort] = struct{}{} + } + } + for _, sp := range svcData.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.VirtualPort > 0 { + continue + } + RETRY: + attempt := next + next++ + _, used := usedPorts[attempt] + if used { + goto RETRY + } + usedPorts[attempt] = struct{}{} + sp.VirtualPort = attempt + } + } + return nil +} + const permutedWarning = "use the disabled node kind if you want to ignore a node" func inheritAndValidateNodes( diff --git a/testing/deployer/topology/images.go b/testing/deployer/topology/images.go index 7adb8d3f7e..318154c582 100644 --- a/testing/deployer/topology/images.go +++ b/testing/deployer/topology/images.go @@ -38,13 +38,21 @@ func (i Images) LocalDataplaneImage() string { return "local/" + name + ":" + tag } +func (i Images) LocalDataplaneTProxyImage() string { + return spliceImageNamesAndTags(i.Dataplane, i.Consul, "tproxy") +} + func (i Images) EnvoyConsulImage() string { - if i.Consul == "" || i.Envoy == "" { + return spliceImageNamesAndTags(i.Consul, i.Envoy, "") +} + +func spliceImageNamesAndTags(base1, base2, nameSuffix string) string { + if base1 == "" || base2 == "" { return "" } - img1, tag1, ok1 := strings.Cut(i.Consul, ":") - img2, tag2, ok2 := strings.Cut(i.Envoy, ":") + img1, tag1, ok1 := strings.Cut(base1, ":") + img2, tag2, ok2 := strings.Cut(base2, ":") if !ok1 { tag1 = "latest" } @@ -66,8 +74,12 @@ func (i Images) EnvoyConsulImage() string { name2 = repo2 } + if nameSuffix != "" { + nameSuffix = "-" + nameSuffix + } + // ex: local/hashicorp-consul-and-envoyproxy-envoy:1.15.0-with-v1.26.2 - return "local/" + name1 + "-and-" + name2 + ":" + tag1 + "-with-" + tag2 + return "local/" + name1 + "-and-" + name2 + nameSuffix + ":" + tag1 + "-with-" + tag2 } // TODO: what is this for and why do we need to do this and why is it named this? diff --git a/testing/deployer/topology/relationships.go b/testing/deployer/topology/relationships.go index 57c075b77f..8448451f3f 100644 --- a/testing/deployer/topology/relationships.go +++ b/testing/deployer/topology/relationships.go @@ -22,6 +22,12 @@ func (t *Topology) ComputeRelationships() []Relationship { Upstream: u, }) } + for _, u := range s.ImpliedUpstreams { + out = append(out, Relationship{ + Caller: s, + Upstream: u, + }) + } } } } @@ -35,6 +41,10 @@ func RenderRelationships(ships []Relationship) string { w := tabwriter.NewWriter(&buf, 0, 0, 3, ' ', tabwriter.Debug) fmt.Fprintf(w, "DOWN\tnode\tservice\tport\tUP\tservice\t\n") for _, r := range ships { + suffix := "" + if r.Upstream.Implied { + suffix = " (implied)" + } fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\t%s\t\n", r.downCluster(), @@ -42,7 +52,7 @@ func RenderRelationships(ships []Relationship) string { r.Caller.ID.String(), r.Upstream.LocalPort, r.upCluster(), - r.Upstream.ID.String(), + r.Upstream.ID.String()+suffix, ) } fmt.Fprintf(w, "\t\t\t\t\t\t\n") @@ -57,14 +67,19 @@ type Relationship struct { } func (r Relationship) String() string { + suffix := "" + if r.Upstream.PortName != "" { + suffix = " port " + r.Upstream.PortName + } return fmt.Sprintf( - "%s on %s in %s via :%d => %s in %s", + "%s on %s in %s via :%d => %s in %s%s", r.Caller.ID.String(), r.Caller.Node.ID().String(), r.downCluster(), r.Upstream.LocalPort, r.Upstream.ID.String(), r.upCluster(), + suffix, ) } diff --git a/testing/deployer/topology/topology.go b/testing/deployer/topology/topology.go index 48edf56296..b59045b564 100644 --- a/testing/deployer/topology/topology.go +++ b/testing/deployer/topology/topology.go @@ -10,6 +10,7 @@ import ( "net/netip" "reflect" "sort" + "strings" "github.com/hashicorp/consul/api" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" @@ -717,6 +718,32 @@ type ServiceAndNode struct { Node *Node } +// Protocol is a convenience function to use when authoring topology configs. +func Protocol(s string) (pbcatalog.Protocol, bool) { + switch strings.ToLower(s) { + case "tcp": + return pbcatalog.Protocol_PROTOCOL_TCP, true + case "http": + return pbcatalog.Protocol_PROTOCOL_HTTP, true + case "http2": + return pbcatalog.Protocol_PROTOCOL_HTTP2, true + case "grpc": + return pbcatalog.Protocol_PROTOCOL_GRPC, true + case "mesh": + return pbcatalog.Protocol_PROTOCOL_MESH, true + default: + return pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, false + } +} + +type Port struct { + Number int + Protocol string `json:",omitempty"` + + // denormalized at topology compile + ActualProtocol pbcatalog.Protocol `json:",omitempty"` +} + // TODO(rb): really this should now be called "workload" or "instance" type Service struct { ID ServiceID @@ -728,15 +755,7 @@ type Service struct { // Ports is the v2 multi-port list for this service. // // This only applies for multi-port (v2). - Ports map[string]int `json:",omitempty"` - - // ExposedPort is the exposed docker port corresponding to 'Port'. - ExposedPort int `json:",omitempty"` - - // ExposedPorts are the exposed docker ports corresponding to 'Ports'. - // - // This only applies for multi-port (v2). - ExposedPorts map[string]int `json:",omitempty"` + Ports map[string]*Port `json:",omitempty"` // V2Services contains service names (which are merged with the tenancy // info from ID) to resolve services in the Services slice in the Cluster @@ -748,14 +767,14 @@ type Service struct { // This only applies for multi-port (v2). V2Services []string `json:",omitempty"` - // WorkloadIdentities contains named WorkloadIdentities to assign to this + // WorkloadIdentity contains named WorkloadIdentity to assign to this // workload. // // If omitted it is inferred that the ID.Name field is the singular // identity for this workload. // // This only applies for multi-port (v2). - WorkloadIdentities []string `json:",omitempty"` + WorkloadIdentity string `json:",omitempty"` Disabled bool `json:",omitempty"` // TODO @@ -774,9 +793,11 @@ type Service struct { Command []string `json:",omitempty"` // optional Env []string `json:",omitempty"` // optional - DisableServiceMesh bool `json:",omitempty"` - IsMeshGateway bool `json:",omitempty"` - Upstreams []*Upstream + EnableTransparentProxy bool `json:",omitempty"` + DisableServiceMesh bool `json:",omitempty"` + IsMeshGateway bool `json:",omitempty"` + Upstreams []*Upstream `json:",omitempty"` + ImpliedUpstreams []*Upstream `json:",omitempty"` // denormalized at topology compile Node *Node `json:"-"` @@ -784,9 +805,28 @@ type Service struct { Workload string `json:"-"` } +func (s *Service) ExposedPort(name string) int { + if s.Node == nil { + panic("ExposedPort cannot be called until after Compile") + } + + var internalPort int + if name == "" { + internalPort = s.Port + } else { + port, ok := s.Ports[name] + if !ok { + panic("port with name " + name + " not present on service") + } + internalPort = port.Number + } + + return s.Node.ExposedPort(internalPort) +} + func (s *Service) PortOrDefault(name string) int { if len(s.Ports) > 0 { - return s.Ports[name] + return s.Ports[name].Number } return s.Port } @@ -800,8 +840,6 @@ func (s *Service) IsV1() bool { } func (s *Service) inheritFromExisting(existing *Service) { - s.ExposedPort = existing.ExposedPort - s.ExposedPorts = existing.ExposedPorts s.ExposedEnvoyAdminPort = existing.ExposedEnvoyAdminPort } @@ -810,10 +848,10 @@ func (s *Service) ports() []int { if len(s.Ports) > 0 { seen := make(map[int]struct{}) for _, port := range s.Ports { - if _, ok := seen[port]; !ok { + if _, ok := seen[port.Number]; !ok { // It's totally fine to expose the same port twice in a workload. - seen[port] = struct{}{} - out = append(out, port) + seen[port.Number] = struct{}{} + out = append(out, port.Number) } } } else if s.Port > 0 { @@ -838,7 +876,6 @@ func (s *Service) HasCheck() bool { } func (s *Service) DigestExposedPorts(ports map[int]int) { - s.ExposedPort = ports[s.Port] if s.EnvoyAdminPort > 0 { s.ExposedEnvoyAdminPort = ports[s.EnvoyAdminPort] } else { @@ -858,15 +895,39 @@ func (s *Service) Validate() error { return fmt.Errorf("cannot specify both singleport and multiport on service in v2") } if s.Port > 0 { - s.Ports = map[string]int{"legacy": s.Port} + s.Ports = map[string]*Port{ + "legacy": { + Number: s.Port, + Protocol: "tcp", + }, + } s.Port = 0 } - for name, port := range s.Ports { - if port <= 0 { - return fmt.Errorf("service has invalid port %q", name) + if !s.DisableServiceMesh && s.EnvoyPublicListenerPort > 0 { + s.Ports["mesh"] = &Port{ + Number: s.EnvoyPublicListenerPort, + Protocol: "mesh", } } + + for name, port := range s.Ports { + if port == nil { + return fmt.Errorf("cannot be nil") + } + if port.Number <= 0 { + return fmt.Errorf("service has invalid port number %q", name) + } + if port.ActualProtocol != pbcatalog.Protocol_PROTOCOL_UNSPECIFIED { + return fmt.Errorf("user cannot specify ActualProtocol field") + } + + proto, valid := Protocol(port.Protocol) + if !valid { + return fmt.Errorf("service has invalid port protocol %q", port.Protocol) + } + port.ActualProtocol = proto + } } else { if len(s.Ports) > 0 { return fmt.Errorf("cannot specify mulitport on service in v1") @@ -874,6 +935,9 @@ func (s *Service) Validate() error { if s.Port <= 0 { return fmt.Errorf("service has invalid port") } + if s.EnableTransparentProxy { + return fmt.Errorf("tproxy does not work with v1 yet") + } } if s.DisableServiceMesh && s.IsMeshGateway { return fmt.Errorf("cannot disable service mesh and still run a mesh gateway") @@ -881,6 +945,12 @@ func (s *Service) Validate() error { if s.DisableServiceMesh && len(s.Upstreams) > 0 { return fmt.Errorf("cannot disable service mesh and configure upstreams") } + if s.DisableServiceMesh && len(s.ImpliedUpstreams) > 0 { + return fmt.Errorf("cannot disable service mesh and configure implied upstreams") + } + if s.DisableServiceMesh && s.EnableTransparentProxy { + return fmt.Errorf("cannot disable service mesh and activate tproxy") + } if s.DisableServiceMesh { if s.EnvoyAdminPort != 0 { @@ -906,6 +976,20 @@ func (s *Service) Validate() error { return fmt.Errorf("upstream local address is invalid: %s", u.LocalAddress) } } + if u.Implied { + return fmt.Errorf("implied field cannot be set") + } + } + for _, u := range s.ImpliedUpstreams { + if u.ID.Name == "" { + return fmt.Errorf("implied upstream service name is required") + } + if u.LocalPort > 0 { + return fmt.Errorf("implied upstream local port cannot be set") + } + if u.LocalAddress != "" { + return fmt.Errorf("implied upstream local address cannot be set") + } } return nil @@ -924,8 +1008,10 @@ type Upstream struct { // TODO: what about mesh gateway mode overrides? // computed at topology compile - Cluster string `json:",omitempty"` - Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil + Cluster string `json:",omitempty"` + Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil + Implied bool `json:",omitempty"` + VirtualPort uint32 `json:",omitempty"` } type Peering struct {