From a7f3069a948ad5a1dcd1872e2b8607d28f4b3785 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Wed, 8 Nov 2023 17:20:00 -0600 Subject: [PATCH] test: add a v2 container integration test of xRoute splits (#19570) This adds a deployer-based integration test verifying that a 90/10 traffic split works for: HTTPRoute, GRPCRoute, and TCPRoute. --- .../explicit_destinations_l7_test.go | 448 ++++++++++++++++++ .../catalogv2/explicit_destinations_test.go | 6 +- test-integ/catalogv2/helpers_test.go | 14 +- .../catalogv2/implicit_destinations_test.go | 2 +- test-integ/go.mod | 5 +- test-integ/go.sum | 2 + test-integ/topoutil/asserter.go | 4 +- test-integ/topoutil/asserter_blankspace.go | 303 ++++++++++++ test-integ/topoutil/blankspace.go | 124 +++++ test-integ/topoutil/fixtures.go | 58 ++- test-integ/topoutil/http2.go | 32 ++ .../deployer/sprawl/internal/build/docker.go | 4 +- testing/deployer/sprawl/internal/tfgen/gen.go | 4 +- testing/deployer/topology/images.go | 20 +- testing/deployer/topology/topology.go | 9 + 15 files changed, 996 insertions(+), 39 deletions(-) create mode 100644 test-integ/catalogv2/explicit_destinations_l7_test.go create mode 100644 test-integ/topoutil/asserter_blankspace.go create mode 100644 test-integ/topoutil/blankspace.go create mode 100644 test-integ/topoutil/http2.go diff --git a/test-integ/catalogv2/explicit_destinations_l7_test.go b/test-integ/catalogv2/explicit_destinations_l7_test.go new file mode 100644 index 0000000000..5581ab2772 --- /dev/null +++ b/test-integ/catalogv2/explicit_destinations_l7_test.go @@ -0,0 +1,448 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package catalogv2 + +import ( + "fmt" + "testing" + + pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest" + "github.com/hashicorp/consul/testing/deployer/topology" + + "github.com/hashicorp/consul/test-integ/topoutil" +) + +func TestSplitterFeaturesL7ExplicitDestinations(t *testing.T) { + cfg := testSplitterFeaturesL7ExplicitDestinationsCreator{}.NewConfig(t) + + sp := sprawltest.Launch(t, cfg) + + var ( + asserter = topoutil.NewAsserter(sp) + + topo = sp.Topology() + cluster = topo.Clusters["dc1"] + + ships = topo.ComputeRelationships() + ) + + clientV2 := sp.ResourceServiceClientForCluster(cluster.Name) + + t.Log(topology.RenderRelationships(ships)) + + // Make sure things are in v2. + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-client", nil, 1) + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server-v1", nil, 1) + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server-v2", nil, 1) + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server", nil, 0) + + // Check relationships + for _, ship := range ships { + t.Run("relationship: "+ship.String(), func(t *testing.T) { + var ( + svc = ship.Caller + u = ship.Upstream + ) + + v1ID := u.ID + v1ID.Name = "static-server-v1" + v1ClusterPrefix := clusterPrefix(u.PortName, v1ID, u.Cluster) + + v2ID := u.ID + v2ID.Name = "static-server-v2" + v2ClusterPrefix := clusterPrefix(u.PortName, v2ID, u.Cluster) + + // we expect 2 clusters, one for each leg of the split + asserter.UpstreamEndpointStatus(t, svc, v1ClusterPrefix+".", "HEALTHY", 1) + asserter.UpstreamEndpointStatus(t, svc, v2ClusterPrefix+".", "HEALTHY", 1) + + // Both should be possible. + v1Expect := fmt.Sprintf("%s::%s", cluster.Name, v1ID.String()) + v2Expect := fmt.Sprintf("%s::%s", cluster.Name, v2ID.String()) + + switch u.PortName { + case "tcp": + asserter.CheckBlankspaceNameTrafficSplitViaTCP(t, svc, u, + map[string]int{v1Expect: 10, v2Expect: 90}) + case "grpc": + asserter.CheckBlankspaceNameTrafficSplitViaGRPC(t, svc, u, + map[string]int{v1Expect: 10, v2Expect: 90}) + case "http": + asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, false, "/", + map[string]int{v1Expect: 10, v2Expect: 90}) + case "http2": + asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, true, "/", + map[string]int{v1Expect: 10, v2Expect: 90}) + default: + t.Fatalf("unexpected port name: %s", u.PortName) + } + }) + } +} + +type testSplitterFeaturesL7ExplicitDestinationsCreator struct{} + +func (c testSplitterFeaturesL7ExplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config { + const clusterName = "dc1" + + servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil) + + cluster := &topology.Cluster{ + Enterprise: utils.IsEnterprise(), + Name: clusterName, + Nodes: servers, + } + + lastNode := 0 + nodeName := func() string { + lastNode++ + return fmt.Sprintf("%s-box%d", clusterName, lastNode) + } + + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "default") + if cluster.Enterprise { + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "default") + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "nsa") + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "nsa") + } + + return &topology.Config{ + Images: utils.TargetImages(), + Networks: []*topology.Network{ + {Name: clusterName}, + {Name: "wan", Type: "wan"}, + }, + Clusters: []*topology.Cluster{ + cluster, + }, + } +} + +func (c testSplitterFeaturesL7ExplicitDestinationsCreator) topologyConfigAddNodes( + t *testing.T, + cluster *topology.Cluster, + nodeName func() string, + partition, + namespace string, +) { + clusterName := cluster.Name + + newServiceID := func(name string) topology.ServiceID { + return topology.ServiceID{ + Partition: partition, + Namespace: namespace, + Name: name, + } + } + + tenancy := &pbresource.Tenancy{ + Partition: partition, + Namespace: namespace, + PeerName: "local", + } + + v1ServerNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewBlankspaceServiceWithDefaults( + clusterName, + newServiceID("static-server-v1"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.Meta = map[string]string{ + "version": "v1", + } + svc.WorkloadIdentity = "static-server-v1" + }, + ), + }, + } + v2ServerNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewBlankspaceServiceWithDefaults( + clusterName, + newServiceID("static-server-v2"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.Meta = map[string]string{ + "version": "v2", + } + svc.WorkloadIdentity = "static-server-v2" + }, + ), + }, + } + clientNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewBlankspaceServiceWithDefaults( + clusterName, + newServiceID("static-client"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.Upstreams = []*topology.Upstream{ + { + ID: newServiceID("static-server"), + PortName: "http", + LocalAddress: "0.0.0.0", // needed for an assertion + LocalPort: 5000, + }, + { + ID: newServiceID("static-server"), + PortName: "http2", + LocalAddress: "0.0.0.0", // needed for an assertion + LocalPort: 5001, + }, + { + ID: newServiceID("static-server"), + PortName: "grpc", + LocalAddress: "0.0.0.0", // needed for an assertion + LocalPort: 5002, + }, + { + ID: newServiceID("static-server"), + PortName: "tcp", + LocalAddress: "0.0.0.0", // needed for an assertion + LocalPort: 5003, + }, + } + }, + ), + }, + } + + v1TrafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbauth.TrafficPermissionsType, + Name: "static-server-v1-perms", + Tenancy: tenancy, + }, + }, &pbauth.TrafficPermissions{ + Destination: &pbauth.Destination{ + IdentityName: "static-server-v1", + }, + Action: pbauth.Action_ACTION_ALLOW, + Permissions: []*pbauth.Permission{{ + Sources: []*pbauth.Source{{ + IdentityName: "static-client", + Namespace: namespace, + }}, + }}, + }) + v2TrafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbauth.TrafficPermissionsType, + Name: "static-server-v2-perms", + Tenancy: tenancy, + }, + }, &pbauth.TrafficPermissions{ + Destination: &pbauth.Destination{ + IdentityName: "static-server-v2", + }, + Action: pbauth.Action_ACTION_ALLOW, + Permissions: []*pbauth.Permission{{ + Sources: []*pbauth.Source{{ + IdentityName: "static-client", + Namespace: namespace, + }}, + }}, + }) + + staticServerService := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbcatalog.ServiceType, + Name: "static-server", + Tenancy: tenancy, + }, + }, &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + // This will result in a 50/50 uncontrolled split. + Prefixes: []string{"static-server-"}, + }, + Ports: []*pbcatalog.ServicePort{ + { + TargetPort: "http", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + { + TargetPort: "http2", + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP2, + }, + { + TargetPort: "grpc", + Protocol: pbcatalog.Protocol_PROTOCOL_GRPC, + }, + { + TargetPort: "tcp", + Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + }, + { + TargetPort: "mesh", + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + }) + + httpServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbmesh.HTTPRouteType, + Name: "static-server-http-route", + Tenancy: tenancy, + }, + }, &pbmesh.HTTPRoute{ + ParentRefs: []*pbmesh.ParentReference{ + { + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server", + Tenancy: tenancy, + }, + Port: "http", + }, + { + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server", + Tenancy: tenancy, + }, + Port: "http2", + }, + }, + Rules: []*pbmesh.HTTPRouteRule{{ + BackendRefs: []*pbmesh.HTTPBackendRef{ + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v1", + Tenancy: tenancy, + }, + }, + Weight: 10, + }, + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v2", + Tenancy: tenancy, + }, + }, + Weight: 90, + }, + }, + }}, + }) + grpcServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbmesh.GRPCRouteType, + Name: "static-server-grpc-route", + Tenancy: tenancy, + }, + }, &pbmesh.GRPCRoute{ + ParentRefs: []*pbmesh.ParentReference{{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server", + Tenancy: tenancy, + }, + Port: "grpc", + }}, + Rules: []*pbmesh.GRPCRouteRule{{ + BackendRefs: []*pbmesh.GRPCBackendRef{ + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v1", + Tenancy: tenancy, + }, + }, + Weight: 10, + }, + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v2", + Tenancy: tenancy, + }, + }, + Weight: 90, + }, + }, + }}, + }) + tcpServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbmesh.TCPRouteType, + Name: "static-server-tcp-route", + Tenancy: tenancy, + }, + }, &pbmesh.TCPRoute{ + ParentRefs: []*pbmesh.ParentReference{{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server", + Tenancy: tenancy, + }, + Port: "tcp", + }}, + Rules: []*pbmesh.TCPRouteRule{{ + BackendRefs: []*pbmesh.TCPBackendRef{ + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v1", + Tenancy: tenancy, + }, + }, + Weight: 10, + }, + { + BackendRef: &pbmesh.BackendReference{ + Ref: &pbresource.Reference{ + Type: pbcatalog.ServiceType, + Name: "static-server-v2", + Tenancy: tenancy, + }, + }, + Weight: 90, + }, + }, + }}, + }) + + cluster.Nodes = append(cluster.Nodes, + clientNode, + v1ServerNode, + v2ServerNode, + ) + + cluster.InitialResources = append(cluster.InitialResources, + staticServerService, + v1TrafficPerms, + v2TrafficPerms, + httpServerRoute, + tcpServerRoute, + grpcServerRoute, + ) +} diff --git a/test-integ/catalogv2/explicit_destinations_test.go b/test-integ/catalogv2/explicit_destinations_test.go index fe8d751e4a..870268d994 100644 --- a/test-integ/catalogv2/explicit_destinations_test.go +++ b/test-integ/catalogv2/explicit_destinations_test.go @@ -167,8 +167,8 @@ func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes( newServiceID("single-client"), topology.NodeVersionV2, func(svc *topology.Service) { - delete(svc.Ports, "grpc") // v2 mode turns this on, so turn it off - delete(svc.Ports, "http-alt") // v2 mode turns this on, so turn it off + delete(svc.Ports, "grpc") // v2 mode turns this on, so turn it off + delete(svc.Ports, "http2") // v2 mode turns this on, so turn it off svc.Upstreams = []*topology.Upstream{{ ID: newServiceID("single-server"), PortName: "http", @@ -232,7 +232,7 @@ func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes( }, { ID: newServiceID("multi-server"), - PortName: "http-alt", + PortName: "http2", LocalAddress: "0.0.0.0", // needed for an assertion LocalPort: 5001, }, diff --git a/test-integ/catalogv2/helpers_test.go b/test-integ/catalogv2/helpers_test.go index f5e352779a..a646f3388a 100644 --- a/test-integ/catalogv2/helpers_test.go +++ b/test-integ/catalogv2/helpers_test.go @@ -11,12 +11,16 @@ import ( func clusterPrefixForUpstream(u *topology.Upstream) string { if u.Peer == "" { - if u.ID.PartitionOrDefault() == "default" { - return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".") - } else { - return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".") - } + return clusterPrefix(u.PortName, u.ID, u.Cluster) } else { return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") } } + +func clusterPrefix(port string, svcID topology.ServiceID, cluster string) string { + if svcID.PartitionOrDefault() == "default" { + return strings.Join([]string{port, svcID.Name, svcID.Namespace, cluster, "internal"}, ".") + } else { + return strings.Join([]string{port, svcID.Name, svcID.Namespace, svcID.Partition, cluster, "internal-v1"}, ".") + } +} diff --git a/test-integ/catalogv2/implicit_destinations_test.go b/test-integ/catalogv2/implicit_destinations_test.go index da3180bcf7..4a7749c73c 100644 --- a/test-integ/catalogv2/implicit_destinations_test.go +++ b/test-integ/catalogv2/implicit_destinations_test.go @@ -177,7 +177,7 @@ func (c testBasicL4ImplicitDestinationsCreator) topologyConfigAddNodes( }, { ID: newServiceID("static-server"), - PortName: "http-alt", + PortName: "http2", }, } }, diff --git a/test-integ/go.mod b/test-integ/go.mod index 44dc740f75..19ef117338 100644 --- a/test-integ/go.mod +++ b/test-integ/go.mod @@ -11,7 +11,10 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 github.com/itchyny/gojq v0.12.13 github.com/mitchellh/copystructure v1.2.0 + github.com/rboyer/blankspace v0.2.1 github.com/stretchr/testify v1.8.4 + golang.org/x/net v0.17.0 + google.golang.org/grpc v1.57.2 ) require ( @@ -97,12 +100,10 @@ require ( golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect - google.golang.org/grpc v1.57.2 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test-integ/go.sum b/test-integ/go.sum index d5338549f7..898d5bb822 100644 --- a/test-integ/go.sum +++ b/test-integ/go.sum @@ -259,6 +259,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/rboyer/blankspace v0.2.1 h1:GzFPETXKOhuwS/jPRUTFIYo9I+RhafEIhnbPByg8S+c= +github.com/rboyer/blankspace v0.2.1/go.mod h1:GhnCkDlx1SYD6m4XCde73ncQ8pFTLSJvlCNmCMg2moQ= github.com/rboyer/safeio v0.2.3 h1:gUybicx1kp8nuM4vO0GA5xTBX58/OBd8MQuErBfDxP8= github.com/rboyer/safeio v0.2.3/go.mod h1:d7RMmt7utQBJZ4B7f0H/cU/EdZibQAU1Y8NWepK2dS8= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= diff --git a/test-integ/topoutil/asserter.go b/test-integ/topoutil/asserter.go index dd0500922c..b14b9821b8 100644 --- a/test-integ/topoutil/asserter.go +++ b/test-integ/topoutil/asserter.go @@ -260,7 +260,7 @@ func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioSvc *topology.Serv var ( node = fortioSvc.Node - addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault("http")) + addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault(upstream.PortName)) client = a.mustGetHTTPClient(t, node.Cluster) ) @@ -286,7 +286,7 @@ func (a *Asserter) FortioFetch2FortioName( var ( node = fortioSvc.Node - addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault("http")) + addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault(upstream.PortName)) client = a.mustGetHTTPClient(t, node.Cluster) ) diff --git a/test-integ/topoutil/asserter_blankspace.go b/test-integ/topoutil/asserter_blankspace.go new file mode 100644 index 0000000000..010e32db7d --- /dev/null +++ b/test-integ/topoutil/asserter_blankspace.go @@ -0,0 +1,303 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package topoutil + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/testing/deployer/topology" + "github.com/stretchr/testify/require" +) + +// CheckBlankspaceNameViaHTTP calls a copy of blankspace and asserts it arrived +// on the correct instance using HTTP1 or HTTP2. +func (a *Asserter) CheckBlankspaceNameViaHTTP( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + useHTTP2 bool, + path string, + clusterName string, + sid topology.ServiceID, +) { + t.Helper() + + a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) { + require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName) + }, func(r *retry.R) {}) +} + +// CheckBlankspaceNameTrafficSplitViaHTTP is like CheckBlankspaceNameViaHTTP +// but it is verifying a relative traffic split. +func (a *Asserter) CheckBlankspaceNameTrafficSplitViaHTTP( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + useHTTP2 bool, + path string, + expect map[string]int, +) { + t.Helper() + + got := make(map[string]int) + a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 100, func(_ *retry.R) { + got = make(map[string]int) + }, func(_ *retry.R, name string) { + got[name]++ + }, func(r *retry.R) { + assertTrafficSplitFor100Requests(r, got, expect) + }) +} + +func (a *Asserter) checkBlankspaceNameViaHTTPWithCallback( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + useHTTP2 bool, + path string, + count int, + resetFn func(r *retry.R), + attemptFn func(r *retry.R, remoteName string), + checkFn func(r *retry.R), +) { + t.Helper() + + var ( + node = service.Node + internalPort = service.PortOrDefault(upstream.PortName) + addr = fmt.Sprintf("%s:%d", node.LocalAddress(), internalPort) + client = a.mustGetHTTPClient(t, node.Cluster) + ) + + if useHTTP2 { + // We can't use the forward proxy for http2, so use the exposed port on localhost instead. + exposedPort := node.ExposedPort(internalPort) + require.True(t, exposedPort > 0) + + addr = fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort) + + // This will clear the proxy field on the transport. + client = EnableHTTP2(client) + } + + var actualURL string + if upstream.Implied { + actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s", + upstream.ID.Name, + upstream.ID.Namespace, + upstream.ID.Partition, + upstream.VirtualPort, + path, + ) + } else { + actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path) + } + + multiassert(t, count, resetFn, func(r *retry.R) { + name, err := GetBlankspaceNameViaHTTP(context.Background(), client, addr, actualURL) + require.NoError(r, err) + attemptFn(r, name) + }, func(r *retry.R) { + checkFn(r) + }) +} + +// CheckBlankspaceNameViaTCP calls a copy of blankspace and asserts it arrived +// on the correct instance using plain tcp sockets. +func (a *Asserter) CheckBlankspaceNameViaTCP( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + clusterName string, + sid topology.ServiceID, +) { + t.Helper() + + a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) { + require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName) + }, func(r *retry.R) {}) +} + +// CheckBlankspaceNameTrafficSplitViaTCP is like CheckBlankspaceNameViaTCP +// but it is verifying a relative traffic split. +func (a *Asserter) CheckBlankspaceNameTrafficSplitViaTCP( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + expect map[string]int, +) { + t.Helper() + + got := make(map[string]int) + a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 100, func(_ *retry.R) { + got = make(map[string]int) + }, func(_ *retry.R, name string) { + got[name]++ + }, func(r *retry.R) { + assertTrafficSplitFor100Requests(r, got, expect) + }) +} + +func (a *Asserter) checkBlankspaceNameViaTCPWithCallback( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + count int, + resetFn func(r *retry.R), + attemptFn func(r *retry.R, remoteName string), + checkFn func(r *retry.R), +) { + t.Helper() + + require.False(t, upstream.Implied, "helper does not support tproxy yet") + port := upstream.LocalPort + require.True(t, port > 0) + + node := service.Node + + // We can't use the forward proxy for TCP yet, so use the exposed port on localhost instead. + exposedPort := node.ExposedPort(port) + require.True(t, exposedPort > 0) + + addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort) + + multiassert(t, count, resetFn, func(r *retry.R) { + name, err := GetBlankspaceNameViaTCP(context.Background(), addr) + require.NoError(r, err) + attemptFn(r, name) + }, func(r *retry.R) { + checkFn(r) + }) +} + +// CheckBlankspaceNameViaGRPC calls a copy of blankspace and asserts it arrived +// on the correct instance using gRPC. +func (a *Asserter) CheckBlankspaceNameViaGRPC( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + clusterName string, + sid topology.ServiceID, +) { + t.Helper() + + a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) { + require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName) + }, func(_ *retry.R) {}) +} + +// CheckBlankspaceNameTrafficSplitViaGRPC is like CheckBlankspaceNameViaGRPC +// but it is verifying a relative traffic split. +func (a *Asserter) CheckBlankspaceNameTrafficSplitViaGRPC( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + expect map[string]int, +) { + t.Helper() + + got := make(map[string]int) + a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 100, func(_ *retry.R) { + got = make(map[string]int) + }, func(_ *retry.R, name string) { + got[name]++ + }, func(r *retry.R) { + assertTrafficSplitFor100Requests(r, got, expect) + }) +} + +func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback( + t *testing.T, + service *topology.Service, + upstream *topology.Upstream, + count int, + resetFn func(r *retry.R), + attemptFn func(r *retry.R, remoteName string), + checkFn func(r *retry.R), +) { + t.Helper() + + require.False(t, upstream.Implied, "helper does not support tproxy yet") + port := upstream.LocalPort + require.True(t, port > 0) + + node := service.Node + + // We can't use the forward proxy for gRPC yet, so use the exposed port on localhost instead. + exposedPort := node.ExposedPort(port) + require.True(t, exposedPort > 0) + + addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort) + + multiassert(t, count, resetFn, func(r *retry.R) { + name, err := GetBlankspaceNameViaGRPC(context.Background(), addr) + require.NoError(r, err) + attemptFn(r, name) + }, func(r *retry.R) { + checkFn(r) + }) +} + +// assertTrafficSplitFor100Requests compares the counts of 100 requests that +// did reach an observed set of destinations (nameCounts) against the expected +// counts of those same services is the same within a fixed difference of 2. +func assertTrafficSplitFor100Requests(t require.TestingT, nameCounts map[string]int, expect map[string]int) { + const ( + numRequests = 100 + allowedDelta = 2 + ) + require.Equal(t, numRequests, sumMapValues(nameCounts), "measured traffic was not %d requests", numRequests) + require.Equal(t, numRequests, sumMapValues(expect), "expected traffic was not %d requests", numRequests) + assertTrafficSplit(t, nameCounts, expect, allowedDelta) +} + +func sumMapValues(m map[string]int) int { + sum := 0 + for _, v := range m { + sum += v + } + return sum +} + +// assertTrafficSplit compares the counts of requests that did reach an +// observed set of destinations (nameCounts) against the expected counts of +// those same services is the same within the provided allowedDelta value. +// +// When doing random traffic splits it'll never be perfect so we need the +// wiggle room to avoid having a flaky test. +func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect map[string]int, allowedDelta int) { + require.Len(t, nameCounts, len(expect)) + for name, expectCount := range expect { + gotCount, ok := nameCounts[name] + require.True(t, ok) + if len(expect) == 1 { + require.Equal(t, expectCount, gotCount) + } else { + require.InDelta(t, expectCount, gotCount, float64(allowedDelta), + "expected %q side of split to have %d requests not %d (e=%d)", + name, expectCount, gotCount, allowedDelta, + ) + } + } +} + +// multiassert will retry in bulk calling attemptFn count times and following +// that with one last call to checkFn. +// +// It's primary use at the time it was written was to execute a set of requests +// repeatedly to witness where the requests went, and then at the end doing a +// verification of traffic splits (a bit like MAP/REDUCE). +func multiassert(t *testing.T, count int, resetFn, attemptFn, checkFn func(r *retry.R)) { + retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) { + resetFn(r) + for i := 0; i < count; i++ { + attemptFn(r) + } + checkFn(r) + }) +} diff --git a/test-integ/topoutil/blankspace.go b/test-integ/topoutil/blankspace.go new file mode 100644 index 0000000000..000853b942 --- /dev/null +++ b/test-integ/topoutil/blankspace.go @@ -0,0 +1,124 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package topoutil + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/rboyer/blankspace/blankpb" + "golang.org/x/net/http2" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// GetBlankspaceNameViaHTTP calls a copy of blankspace once via HTTP and +// retrieves the self-identified name of the instance. +func GetBlankspaceNameViaHTTP( + ctx context.Context, + client *http.Client, + serverAddr string, + actualURL string, +) (string, error) { + url := fmt.Sprintf("http://%s/fetch?url=%s", serverAddr, + url.QueryEscape(actualURL), + ) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + + res, err := client.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return "", fmt.Errorf("status code is not 200: %d", res.StatusCode) + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return "", err + } + + var v struct { + Name string + } + if err := json.Unmarshal(body, &v); err != nil { + return "", err + } + + if _, useHTTP2 := client.Transport.(*http2.Transport); useHTTP2 { + if res.ProtoMajor < 2 { + return "", fmt.Errorf("should be using http > 1.x not %d", res.ProtoMajor) + } + } + + return v.Name, nil +} + +// GetBlankspaceNameViaGRPC calls a copy of blankspace once via gRPC and +// retrieves the self-identified name of the instance. +func GetBlankspaceNameViaGRPC(ctx context.Context, serverAddr string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return "", err + } + defer conn.Close() + + client := blankpb.NewServerClient(conn) + + resp, err := client.Describe(ctx, &blankpb.DescribeRequest{}) + if err != nil { + return "", fmt.Errorf("grpc error from Describe: %w", err) + } + + return resp.GetName(), nil +} + +// GetBlankspaceNameViaTCP calls a copy of blankspace once via tcp and +// retrieves the self-identified name of the instance. +func GetBlankspaceNameViaTCP(ctx context.Context, serverAddr string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + d := net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 250 * time.Millisecond, + } + + conn, err := d.DialContext(ctx, "tcp", serverAddr) + if err != nil { + return "", fmt.Errorf("tcp error dialing: %w", err) + } + defer conn.Close() + + if _, err := conn.Write([]byte("describe\n")); err != nil { + return "", fmt.Errorf("error sending tcp request: %w", err) + } + + scan := bufio.NewScanner(conn) + + if !scan.Scan() { + return "", fmt.Errorf("server did not reply") + } + + name := strings.TrimSpace(scan.Text()) + + return name, nil +} diff --git a/test-integ/topoutil/fixtures.go b/test-integ/topoutil/fixtures.go index 9b1f958f44..06442dfeef 100644 --- a/test-integ/topoutil/fixtures.go +++ b/test-integ/topoutil/fixtures.go @@ -4,6 +4,7 @@ package topoutil import ( + "fmt" "strconv" "github.com/hashicorp/consul/testing/deployer/topology" @@ -20,6 +21,7 @@ func NewFortioServiceWithDefaults( const ( httpPort = 8080 grpcPort = 8079 + tcpPort = 8078 adminPort = 19000 ) sid.Normalize() @@ -36,19 +38,61 @@ func NewFortioServiceWithDefaults( "server", "-http-port", strconv.Itoa(httpPort), "-grpc-port", strconv.Itoa(grpcPort), + "-tcp-port", strconv.Itoa(tcpPort), "-redirect-port", "-disabled", }, } if nodeVersion == topology.NodeVersionV2 { svc.Ports = map[string]*topology.Port{ - // TODO(rb/v2): once L7 works in v2 switch these back - "http": {Number: httpPort, Protocol: "tcp"}, - "http-alt": {Number: httpPort, Protocol: "tcp"}, - "grpc": {Number: grpcPort, Protocol: "tcp"}, - // "http": {Number: httpPort, Protocol: "http"}, - // "http-alt": {Number: httpPort, Protocol: "http"}, - // "grpc": {Number: grpcPort, Protocol: "grpc"}, + "http": {Number: httpPort, Protocol: "http"}, + "http2": {Number: httpPort, Protocol: "http2"}, + "grpc": {Number: grpcPort, Protocol: "grpc"}, + "tcp": {Number: tcpPort, Protocol: "tcp"}, + } + } else { + svc.Port = httpPort + } + + if mut != nil { + mut(svc) + } + return svc +} + +func NewBlankspaceServiceWithDefaults( + cluster string, + sid topology.ServiceID, + nodeVersion topology.NodeVersion, + mut func(s *topology.Service), +) *topology.Service { + const ( + httpPort = 8080 + grpcPort = 8079 + tcpPort = 8078 + adminPort = 19000 + ) + sid.Normalize() + + svc := &topology.Service{ + ID: sid, + Image: HashicorpDockerProxy + "/rboyer/blankspace", + EnvoyAdminPort: adminPort, + CheckTCP: "127.0.0.1:" + strconv.Itoa(httpPort), + Command: []string{ + "-name", cluster + "::" + sid.String(), + "-http-addr", fmt.Sprintf(":%d", httpPort), + "-grpc-addr", fmt.Sprintf(":%d", grpcPort), + "-tcp-addr", fmt.Sprintf(":%d", tcpPort), + }, + } + + if nodeVersion == topology.NodeVersionV2 { + svc.Ports = map[string]*topology.Port{ + "http": {Number: httpPort, Protocol: "http"}, + "http2": {Number: httpPort, Protocol: "http2"}, + "grpc": {Number: grpcPort, Protocol: "grpc"}, + "tcp": {Number: tcpPort, Protocol: "tcp"}, } } else { svc.Port = httpPort diff --git a/test-integ/topoutil/http2.go b/test-integ/topoutil/http2.go new file mode 100644 index 0000000000..148074e563 --- /dev/null +++ b/test-integ/topoutil/http2.go @@ -0,0 +1,32 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package topoutil + +import ( + "context" + "crypto/tls" + "net" + "net/http" + + "golang.org/x/net/http2" +) + +// EnableHTTP2 returns a new shallow copy of client that has been tweaked to do +// h2c (cleartext http2). +// +// Note that this clears the Client.Transport.Proxy trick because http2 and +// http proxies are incompatible currently in Go. +func EnableHTTP2(client *http.Client) *http.Client { + // Shallow copy, and swap the transport + client2 := *client + client = &client2 + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, network, addr) + }, + } + return client +} diff --git a/testing/deployer/sprawl/internal/build/docker.go b/testing/deployer/sprawl/internal/build/docker.go index 435fb0b506..53baa07ae7 100644 --- a/testing/deployer/sprawl/internal/build/docker.go +++ b/testing/deployer/sprawl/internal/build/docker.go @@ -102,6 +102,8 @@ func DockerImages( built := make(map[string]struct{}) for _, c := range t.Clusters { for _, n := range c.Nodes { + needsTproxy := n.NeedsTransparentProxy() + joint := n.Images.EnvoyConsulImage() if _, ok := built[joint]; joint != "" && !ok { logger.Info("building envoy+consul image", "image", joint) @@ -145,7 +147,7 @@ func DockerImages( } cdpTproxy := n.Images.LocalDataplaneTProxyImage() - if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok { + if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok && needsTproxy { logger.Info("building image", "image", cdpTproxy) logw := logger.Named("docker_dataplane_tproxy").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) err := run.DockerExecWithStderr(context.TODO(), []string{ diff --git a/testing/deployer/sprawl/internal/tfgen/gen.go b/testing/deployer/sprawl/internal/tfgen/gen.go index 0f38714b0c..3a7df0e56f 100644 --- a/testing/deployer/sprawl/internal/tfgen/gen.go +++ b/testing/deployer/sprawl/internal/tfgen/gen.go @@ -262,7 +262,9 @@ func (g *Generator) Generate(step Step) error { addImage("", node.Images.Consul) addImage("", node.Images.EnvoyConsulImage()) addImage("", node.Images.LocalDataplaneImage()) - addImage("", node.Images.LocalDataplaneTProxyImage()) + if node.NeedsTransparentProxy() { + addImage("", node.Images.LocalDataplaneTProxyImage()) + } if node.IsAgent() { addVolume(node.DockerName()) diff --git a/testing/deployer/topology/images.go b/testing/deployer/topology/images.go index 318154c582..a41adf5504 100644 --- a/testing/deployer/topology/images.go +++ b/testing/deployer/topology/images.go @@ -29,10 +29,7 @@ func (i Images) LocalDataplaneImage() string { tag = "latest" } - repo, name, ok := strings.Cut(img, "/") - if ok { - name = repo + "-" + name - } + name := strings.ReplaceAll(img, "/", "-") // ex: local/hashicorp-consul-dataplane:1.1.0 return "local/" + name + ":" + tag @@ -60,19 +57,8 @@ func spliceImageNamesAndTags(base1, base2, nameSuffix string) string { tag2 = "latest" } - repo1, name1, ok1 := strings.Cut(img1, "/") - repo2, name2, ok2 := strings.Cut(img2, "/") - - if ok1 { - name1 = repo1 + "-" + name1 - } else { - name1 = repo1 - } - if ok2 { - name2 = repo2 + "-" + name2 - } else { - name2 = repo2 - } + name1 := strings.ReplaceAll(img1, "/", "-") + name2 := strings.ReplaceAll(img2, "/", "-") if nameSuffix != "" { nameSuffix = "-" + nameSuffix diff --git a/testing/deployer/topology/topology.go b/testing/deployer/topology/topology.go index b59045b564..b4615cc827 100644 --- a/testing/deployer/topology/topology.go +++ b/testing/deployer/topology/topology.go @@ -679,6 +679,15 @@ func (n *Node) SortedServices() []*Service { return out } +func (n *Node) NeedsTransparentProxy() bool { + for _, svc := range n.Services { + if svc.EnableTransparentProxy { + return true + } + } + return false +} + // DigestExposedPorts returns true if it was changed. func (n *Node) DigestExposedPorts(ports map[int]int) bool { if reflect.DeepEqual(n.usedPorts, ports) {