test: add a v2 container integration test of xRoute splits (#19570)

This adds a deployer-based integration test verifying that a 90/10 traffic
split works for: HTTPRoute, GRPCRoute, and TCPRoute.
This commit is contained in:
R.B. Boyer 2023-11-08 17:20:00 -06:00 committed by GitHub
parent 7de0b45ba4
commit a7f3069a94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 996 additions and 39 deletions

View File

@ -0,0 +1,448 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package catalogv2
import (
"fmt"
"testing"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/test-integ/topoutil"
)
func TestSplitterFeaturesL7ExplicitDestinations(t *testing.T) {
cfg := testSplitterFeaturesL7ExplicitDestinationsCreator{}.NewConfig(t)
sp := sprawltest.Launch(t, cfg)
var (
asserter = topoutil.NewAsserter(sp)
topo = sp.Topology()
cluster = topo.Clusters["dc1"]
ships = topo.ComputeRelationships()
)
clientV2 := sp.ResourceServiceClientForCluster(cluster.Name)
t.Log(topology.RenderRelationships(ships))
// Make sure things are in v2.
libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-client", nil, 1)
libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server-v1", nil, 1)
libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server-v2", nil, 1)
libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, "static-server", nil, 0)
// Check relationships
for _, ship := range ships {
t.Run("relationship: "+ship.String(), func(t *testing.T) {
var (
svc = ship.Caller
u = ship.Upstream
)
v1ID := u.ID
v1ID.Name = "static-server-v1"
v1ClusterPrefix := clusterPrefix(u.PortName, v1ID, u.Cluster)
v2ID := u.ID
v2ID.Name = "static-server-v2"
v2ClusterPrefix := clusterPrefix(u.PortName, v2ID, u.Cluster)
// we expect 2 clusters, one for each leg of the split
asserter.UpstreamEndpointStatus(t, svc, v1ClusterPrefix+".", "HEALTHY", 1)
asserter.UpstreamEndpointStatus(t, svc, v2ClusterPrefix+".", "HEALTHY", 1)
// Both should be possible.
v1Expect := fmt.Sprintf("%s::%s", cluster.Name, v1ID.String())
v2Expect := fmt.Sprintf("%s::%s", cluster.Name, v2ID.String())
switch u.PortName {
case "tcp":
asserter.CheckBlankspaceNameTrafficSplitViaTCP(t, svc, u,
map[string]int{v1Expect: 10, v2Expect: 90})
case "grpc":
asserter.CheckBlankspaceNameTrafficSplitViaGRPC(t, svc, u,
map[string]int{v1Expect: 10, v2Expect: 90})
case "http":
asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, false, "/",
map[string]int{v1Expect: 10, v2Expect: 90})
case "http2":
asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, true, "/",
map[string]int{v1Expect: 10, v2Expect: 90})
default:
t.Fatalf("unexpected port name: %s", u.PortName)
}
})
}
}
type testSplitterFeaturesL7ExplicitDestinationsCreator struct{}
func (c testSplitterFeaturesL7ExplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config {
const clusterName = "dc1"
servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil)
cluster := &topology.Cluster{
Enterprise: utils.IsEnterprise(),
Name: clusterName,
Nodes: servers,
}
lastNode := 0
nodeName := func() string {
lastNode++
return fmt.Sprintf("%s-box%d", clusterName, lastNode)
}
c.topologyConfigAddNodes(t, cluster, nodeName, "default", "default")
if cluster.Enterprise {
c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "default")
c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "nsa")
c.topologyConfigAddNodes(t, cluster, nodeName, "default", "nsa")
}
return &topology.Config{
Images: utils.TargetImages(),
Networks: []*topology.Network{
{Name: clusterName},
{Name: "wan", Type: "wan"},
},
Clusters: []*topology.Cluster{
cluster,
},
}
}
func (c testSplitterFeaturesL7ExplicitDestinationsCreator) topologyConfigAddNodes(
t *testing.T,
cluster *topology.Cluster,
nodeName func() string,
partition,
namespace string,
) {
clusterName := cluster.Name
newServiceID := func(name string) topology.ServiceID {
return topology.ServiceID{
Partition: partition,
Namespace: namespace,
Name: name,
}
}
tenancy := &pbresource.Tenancy{
Partition: partition,
Namespace: namespace,
PeerName: "local",
}
v1ServerNode := &topology.Node{
Kind: topology.NodeKindDataplane,
Version: topology.NodeVersionV2,
Partition: partition,
Name: nodeName(),
Services: []*topology.Service{
topoutil.NewBlankspaceServiceWithDefaults(
clusterName,
newServiceID("static-server-v1"),
topology.NodeVersionV2,
func(svc *topology.Service) {
svc.Meta = map[string]string{
"version": "v1",
}
svc.WorkloadIdentity = "static-server-v1"
},
),
},
}
v2ServerNode := &topology.Node{
Kind: topology.NodeKindDataplane,
Version: topology.NodeVersionV2,
Partition: partition,
Name: nodeName(),
Services: []*topology.Service{
topoutil.NewBlankspaceServiceWithDefaults(
clusterName,
newServiceID("static-server-v2"),
topology.NodeVersionV2,
func(svc *topology.Service) {
svc.Meta = map[string]string{
"version": "v2",
}
svc.WorkloadIdentity = "static-server-v2"
},
),
},
}
clientNode := &topology.Node{
Kind: topology.NodeKindDataplane,
Version: topology.NodeVersionV2,
Partition: partition,
Name: nodeName(),
Services: []*topology.Service{
topoutil.NewBlankspaceServiceWithDefaults(
clusterName,
newServiceID("static-client"),
topology.NodeVersionV2,
func(svc *topology.Service) {
svc.Upstreams = []*topology.Upstream{
{
ID: newServiceID("static-server"),
PortName: "http",
LocalAddress: "0.0.0.0", // needed for an assertion
LocalPort: 5000,
},
{
ID: newServiceID("static-server"),
PortName: "http2",
LocalAddress: "0.0.0.0", // needed for an assertion
LocalPort: 5001,
},
{
ID: newServiceID("static-server"),
PortName: "grpc",
LocalAddress: "0.0.0.0", // needed for an assertion
LocalPort: 5002,
},
{
ID: newServiceID("static-server"),
PortName: "tcp",
LocalAddress: "0.0.0.0", // needed for an assertion
LocalPort: 5003,
},
}
},
),
},
}
v1TrafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbauth.TrafficPermissionsType,
Name: "static-server-v1-perms",
Tenancy: tenancy,
},
}, &pbauth.TrafficPermissions{
Destination: &pbauth.Destination{
IdentityName: "static-server-v1",
},
Action: pbauth.Action_ACTION_ALLOW,
Permissions: []*pbauth.Permission{{
Sources: []*pbauth.Source{{
IdentityName: "static-client",
Namespace: namespace,
}},
}},
})
v2TrafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbauth.TrafficPermissionsType,
Name: "static-server-v2-perms",
Tenancy: tenancy,
},
}, &pbauth.TrafficPermissions{
Destination: &pbauth.Destination{
IdentityName: "static-server-v2",
},
Action: pbauth.Action_ACTION_ALLOW,
Permissions: []*pbauth.Permission{{
Sources: []*pbauth.Source{{
IdentityName: "static-client",
Namespace: namespace,
}},
}},
})
staticServerService := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbcatalog.ServiceType,
Name: "static-server",
Tenancy: tenancy,
},
}, &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
// This will result in a 50/50 uncontrolled split.
Prefixes: []string{"static-server-"},
},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "http2",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP2,
},
{
TargetPort: "grpc",
Protocol: pbcatalog.Protocol_PROTOCOL_GRPC,
},
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
})
httpServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbmesh.HTTPRouteType,
Name: "static-server-http-route",
Tenancy: tenancy,
},
}, &pbmesh.HTTPRoute{
ParentRefs: []*pbmesh.ParentReference{
{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server",
Tenancy: tenancy,
},
Port: "http",
},
{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server",
Tenancy: tenancy,
},
Port: "http2",
},
},
Rules: []*pbmesh.HTTPRouteRule{{
BackendRefs: []*pbmesh.HTTPBackendRef{
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v1",
Tenancy: tenancy,
},
},
Weight: 10,
},
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v2",
Tenancy: tenancy,
},
},
Weight: 90,
},
},
}},
})
grpcServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbmesh.GRPCRouteType,
Name: "static-server-grpc-route",
Tenancy: tenancy,
},
}, &pbmesh.GRPCRoute{
ParentRefs: []*pbmesh.ParentReference{{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server",
Tenancy: tenancy,
},
Port: "grpc",
}},
Rules: []*pbmesh.GRPCRouteRule{{
BackendRefs: []*pbmesh.GRPCBackendRef{
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v1",
Tenancy: tenancy,
},
},
Weight: 10,
},
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v2",
Tenancy: tenancy,
},
},
Weight: 90,
},
},
}},
})
tcpServerRoute := sprawltest.MustSetResourceData(t, &pbresource.Resource{
Id: &pbresource.ID{
Type: pbmesh.TCPRouteType,
Name: "static-server-tcp-route",
Tenancy: tenancy,
},
}, &pbmesh.TCPRoute{
ParentRefs: []*pbmesh.ParentReference{{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server",
Tenancy: tenancy,
},
Port: "tcp",
}},
Rules: []*pbmesh.TCPRouteRule{{
BackendRefs: []*pbmesh.TCPBackendRef{
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v1",
Tenancy: tenancy,
},
},
Weight: 10,
},
{
BackendRef: &pbmesh.BackendReference{
Ref: &pbresource.Reference{
Type: pbcatalog.ServiceType,
Name: "static-server-v2",
Tenancy: tenancy,
},
},
Weight: 90,
},
},
}},
})
cluster.Nodes = append(cluster.Nodes,
clientNode,
v1ServerNode,
v2ServerNode,
)
cluster.InitialResources = append(cluster.InitialResources,
staticServerService,
v1TrafficPerms,
v2TrafficPerms,
httpServerRoute,
tcpServerRoute,
grpcServerRoute,
)
}

View File

@ -168,7 +168,7 @@ func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes(
topology.NodeVersionV2, topology.NodeVersionV2,
func(svc *topology.Service) { func(svc *topology.Service) {
delete(svc.Ports, "grpc") // v2 mode turns this on, so turn it off delete(svc.Ports, "grpc") // v2 mode turns this on, so turn it off
delete(svc.Ports, "http-alt") // v2 mode turns this on, so turn it off delete(svc.Ports, "http2") // v2 mode turns this on, so turn it off
svc.Upstreams = []*topology.Upstream{{ svc.Upstreams = []*topology.Upstream{{
ID: newServiceID("single-server"), ID: newServiceID("single-server"),
PortName: "http", PortName: "http",
@ -232,7 +232,7 @@ func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes(
}, },
{ {
ID: newServiceID("multi-server"), ID: newServiceID("multi-server"),
PortName: "http-alt", PortName: "http2",
LocalAddress: "0.0.0.0", // needed for an assertion LocalAddress: "0.0.0.0", // needed for an assertion
LocalPort: 5001, LocalPort: 5001,
}, },

View File

@ -11,12 +11,16 @@ import (
func clusterPrefixForUpstream(u *topology.Upstream) string { func clusterPrefixForUpstream(u *topology.Upstream) string {
if u.Peer == "" { if u.Peer == "" {
if u.ID.PartitionOrDefault() == "default" { return clusterPrefix(u.PortName, u.ID, u.Cluster)
return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".")
} else {
return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".")
}
} else { } else {
return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".")
} }
} }
func clusterPrefix(port string, svcID topology.ServiceID, cluster string) string {
if svcID.PartitionOrDefault() == "default" {
return strings.Join([]string{port, svcID.Name, svcID.Namespace, cluster, "internal"}, ".")
} else {
return strings.Join([]string{port, svcID.Name, svcID.Namespace, svcID.Partition, cluster, "internal-v1"}, ".")
}
}

View File

@ -177,7 +177,7 @@ func (c testBasicL4ImplicitDestinationsCreator) topologyConfigAddNodes(
}, },
{ {
ID: newServiceID("static-server"), ID: newServiceID("static-server"),
PortName: "http-alt", PortName: "http2",
}, },
} }
}, },

View File

@ -11,7 +11,10 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-cleanhttp v0.5.2
github.com/itchyny/gojq v0.12.13 github.com/itchyny/gojq v0.12.13
github.com/mitchellh/copystructure v1.2.0 github.com/mitchellh/copystructure v1.2.0
github.com/rboyer/blankspace v0.2.1
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.8.4
golang.org/x/net v0.17.0
google.golang.org/grpc v1.57.2
) )
require ( require (
@ -97,12 +100,10 @@ require (
golang.org/x/crypto v0.14.0 // indirect golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/grpc v1.57.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

View File

@ -259,6 +259,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rboyer/blankspace v0.2.1 h1:GzFPETXKOhuwS/jPRUTFIYo9I+RhafEIhnbPByg8S+c=
github.com/rboyer/blankspace v0.2.1/go.mod h1:GhnCkDlx1SYD6m4XCde73ncQ8pFTLSJvlCNmCMg2moQ=
github.com/rboyer/safeio v0.2.3 h1:gUybicx1kp8nuM4vO0GA5xTBX58/OBd8MQuErBfDxP8= github.com/rboyer/safeio v0.2.3 h1:gUybicx1kp8nuM4vO0GA5xTBX58/OBd8MQuErBfDxP8=
github.com/rboyer/safeio v0.2.3/go.mod h1:d7RMmt7utQBJZ4B7f0H/cU/EdZibQAU1Y8NWepK2dS8= github.com/rboyer/safeio v0.2.3/go.mod h1:d7RMmt7utQBJZ4B7f0H/cU/EdZibQAU1Y8NWepK2dS8=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=

View File

@ -260,7 +260,7 @@ func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioSvc *topology.Serv
var ( var (
node = fortioSvc.Node node = fortioSvc.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault("http")) addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault(upstream.PortName))
client = a.mustGetHTTPClient(t, node.Cluster) client = a.mustGetHTTPClient(t, node.Cluster)
) )
@ -286,7 +286,7 @@ func (a *Asserter) FortioFetch2FortioName(
var ( var (
node = fortioSvc.Node node = fortioSvc.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault("http")) addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioSvc.PortOrDefault(upstream.PortName))
client = a.mustGetHTTPClient(t, node.Cluster) client = a.mustGetHTTPClient(t, node.Cluster)
) )

View File

@ -0,0 +1,303 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/stretchr/testify/require"
)
// CheckBlankspaceNameViaHTTP calls a copy of blankspace and asserts it arrived
// on the correct instance using HTTP1 or HTTP2.
func (a *Asserter) CheckBlankspaceNameViaHTTP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaHTTP is like CheckBlankspaceNameViaHTTP
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaHTTP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaHTTPWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
var (
node = service.Node
internalPort = service.PortOrDefault(upstream.PortName)
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), internalPort)
client = a.mustGetHTTPClient(t, node.Cluster)
)
if useHTTP2 {
// We can't use the forward proxy for http2, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(internalPort)
require.True(t, exposedPort > 0)
addr = fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
// This will clear the proxy field on the transport.
client = EnableHTTP2(client)
}
var actualURL string
if upstream.Implied {
actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s",
upstream.ID.Name,
upstream.ID.Namespace,
upstream.ID.Partition,
upstream.VirtualPort,
path,
)
} else {
actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)
}
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaHTTP(context.Background(), client, addr, actualURL)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// CheckBlankspaceNameViaTCP calls a copy of blankspace and asserts it arrived
// on the correct instance using plain tcp sockets.
func (a *Asserter) CheckBlankspaceNameViaTCP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaTCP is like CheckBlankspaceNameViaTCP
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaTCP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaTCPWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
require.False(t, upstream.Implied, "helper does not support tproxy yet")
port := upstream.LocalPort
require.True(t, port > 0)
node := service.Node
// We can't use the forward proxy for TCP yet, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(port)
require.True(t, exposedPort > 0)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaTCP(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// CheckBlankspaceNameViaGRPC calls a copy of blankspace and asserts it arrived
// on the correct instance using gRPC.
func (a *Asserter) CheckBlankspaceNameViaGRPC(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(_ *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaGRPC is like CheckBlankspaceNameViaGRPC
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaGRPC(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
require.False(t, upstream.Implied, "helper does not support tproxy yet")
port := upstream.LocalPort
require.True(t, port > 0)
node := service.Node
// We can't use the forward proxy for gRPC yet, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(port)
require.True(t, exposedPort > 0)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaGRPC(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// assertTrafficSplitFor100Requests compares the counts of 100 requests that
// did reach an observed set of destinations (nameCounts) against the expected
// counts of those same services is the same within a fixed difference of 2.
func assertTrafficSplitFor100Requests(t require.TestingT, nameCounts map[string]int, expect map[string]int) {
const (
numRequests = 100
allowedDelta = 2
)
require.Equal(t, numRequests, sumMapValues(nameCounts), "measured traffic was not %d requests", numRequests)
require.Equal(t, numRequests, sumMapValues(expect), "expected traffic was not %d requests", numRequests)
assertTrafficSplit(t, nameCounts, expect, allowedDelta)
}
func sumMapValues(m map[string]int) int {
sum := 0
for _, v := range m {
sum += v
}
return sum
}
// assertTrafficSplit compares the counts of requests that did reach an
// observed set of destinations (nameCounts) against the expected counts of
// those same services is the same within the provided allowedDelta value.
//
// When doing random traffic splits it'll never be perfect so we need the
// wiggle room to avoid having a flaky test.
func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect map[string]int, allowedDelta int) {
require.Len(t, nameCounts, len(expect))
for name, expectCount := range expect {
gotCount, ok := nameCounts[name]
require.True(t, ok)
if len(expect) == 1 {
require.Equal(t, expectCount, gotCount)
} else {
require.InDelta(t, expectCount, gotCount, float64(allowedDelta),
"expected %q side of split to have %d requests not %d (e=%d)",
name, expectCount, gotCount, allowedDelta,
)
}
}
}
// multiassert will retry in bulk calling attemptFn count times and following
// that with one last call to checkFn.
//
// It's primary use at the time it was written was to execute a set of requests
// repeatedly to witness where the requests went, and then at the end doing a
// verification of traffic splits (a bit like MAP/REDUCE).
func multiassert(t *testing.T, count int, resetFn, attemptFn, checkFn func(r *retry.R)) {
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
resetFn(r)
for i := 0; i < count; i++ {
attemptFn(r)
}
checkFn(r)
})
}

View File

@ -0,0 +1,124 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/rboyer/blankspace/blankpb"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// GetBlankspaceNameViaHTTP calls a copy of blankspace once via HTTP and
// retrieves the self-identified name of the instance.
func GetBlankspaceNameViaHTTP(
ctx context.Context,
client *http.Client,
serverAddr string,
actualURL string,
) (string, error) {
url := fmt.Sprintf("http://%s/fetch?url=%s", serverAddr,
url.QueryEscape(actualURL),
)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
res, err := client.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("status code is not 200: %d", res.StatusCode)
}
body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}
var v struct {
Name string
}
if err := json.Unmarshal(body, &v); err != nil {
return "", err
}
if _, useHTTP2 := client.Transport.(*http2.Transport); useHTTP2 {
if res.ProtoMajor < 2 {
return "", fmt.Errorf("should be using http > 1.x not %d", res.ProtoMajor)
}
}
return v.Name, nil
}
// GetBlankspaceNameViaGRPC calls a copy of blankspace once via gRPC and
// retrieves the self-identified name of the instance.
func GetBlankspaceNameViaGRPC(ctx context.Context, serverAddr string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", err
}
defer conn.Close()
client := blankpb.NewServerClient(conn)
resp, err := client.Describe(ctx, &blankpb.DescribeRequest{})
if err != nil {
return "", fmt.Errorf("grpc error from Describe: %w", err)
}
return resp.GetName(), nil
}
// GetBlankspaceNameViaTCP calls a copy of blankspace once via tcp and
// retrieves the self-identified name of the instance.
func GetBlankspaceNameViaTCP(ctx context.Context, serverAddr string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
d := net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 250 * time.Millisecond,
}
conn, err := d.DialContext(ctx, "tcp", serverAddr)
if err != nil {
return "", fmt.Errorf("tcp error dialing: %w", err)
}
defer conn.Close()
if _, err := conn.Write([]byte("describe\n")); err != nil {
return "", fmt.Errorf("error sending tcp request: %w", err)
}
scan := bufio.NewScanner(conn)
if !scan.Scan() {
return "", fmt.Errorf("server did not reply")
}
name := strings.TrimSpace(scan.Text())
return name, nil
}

View File

@ -4,6 +4,7 @@
package topoutil package topoutil
import ( import (
"fmt"
"strconv" "strconv"
"github.com/hashicorp/consul/testing/deployer/topology" "github.com/hashicorp/consul/testing/deployer/topology"
@ -20,6 +21,7 @@ func NewFortioServiceWithDefaults(
const ( const (
httpPort = 8080 httpPort = 8080
grpcPort = 8079 grpcPort = 8079
tcpPort = 8078
adminPort = 19000 adminPort = 19000
) )
sid.Normalize() sid.Normalize()
@ -36,19 +38,61 @@ func NewFortioServiceWithDefaults(
"server", "server",
"-http-port", strconv.Itoa(httpPort), "-http-port", strconv.Itoa(httpPort),
"-grpc-port", strconv.Itoa(grpcPort), "-grpc-port", strconv.Itoa(grpcPort),
"-tcp-port", strconv.Itoa(tcpPort),
"-redirect-port", "-disabled", "-redirect-port", "-disabled",
}, },
} }
if nodeVersion == topology.NodeVersionV2 { if nodeVersion == topology.NodeVersionV2 {
svc.Ports = map[string]*topology.Port{ svc.Ports = map[string]*topology.Port{
// TODO(rb/v2): once L7 works in v2 switch these back "http": {Number: httpPort, Protocol: "http"},
"http": {Number: httpPort, Protocol: "tcp"}, "http2": {Number: httpPort, Protocol: "http2"},
"http-alt": {Number: httpPort, Protocol: "tcp"}, "grpc": {Number: grpcPort, Protocol: "grpc"},
"grpc": {Number: grpcPort, Protocol: "tcp"}, "tcp": {Number: tcpPort, Protocol: "tcp"},
// "http": {Number: httpPort, Protocol: "http"}, }
// "http-alt": {Number: httpPort, Protocol: "http"}, } else {
// "grpc": {Number: grpcPort, Protocol: "grpc"}, svc.Port = httpPort
}
if mut != nil {
mut(svc)
}
return svc
}
func NewBlankspaceServiceWithDefaults(
cluster string,
sid topology.ServiceID,
nodeVersion topology.NodeVersion,
mut func(s *topology.Service),
) *topology.Service {
const (
httpPort = 8080
grpcPort = 8079
tcpPort = 8078
adminPort = 19000
)
sid.Normalize()
svc := &topology.Service{
ID: sid,
Image: HashicorpDockerProxy + "/rboyer/blankspace",
EnvoyAdminPort: adminPort,
CheckTCP: "127.0.0.1:" + strconv.Itoa(httpPort),
Command: []string{
"-name", cluster + "::" + sid.String(),
"-http-addr", fmt.Sprintf(":%d", httpPort),
"-grpc-addr", fmt.Sprintf(":%d", grpcPort),
"-tcp-addr", fmt.Sprintf(":%d", tcpPort),
},
}
if nodeVersion == topology.NodeVersionV2 {
svc.Ports = map[string]*topology.Port{
"http": {Number: httpPort, Protocol: "http"},
"http2": {Number: httpPort, Protocol: "http2"},
"grpc": {Number: grpcPort, Protocol: "grpc"},
"tcp": {Number: tcpPort, Protocol: "tcp"},
} }
} else { } else {
svc.Port = httpPort svc.Port = httpPort

View File

@ -0,0 +1,32 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
"context"
"crypto/tls"
"net"
"net/http"
"golang.org/x/net/http2"
)
// EnableHTTP2 returns a new shallow copy of client that has been tweaked to do
// h2c (cleartext http2).
//
// Note that this clears the Client.Transport.Proxy trick because http2 and
// http proxies are incompatible currently in Go.
func EnableHTTP2(client *http.Client) *http.Client {
// Shallow copy, and swap the transport
client2 := *client
client = &client2
client.Transport = &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
}
return client
}

View File

@ -102,6 +102,8 @@ func DockerImages(
built := make(map[string]struct{}) built := make(map[string]struct{})
for _, c := range t.Clusters { for _, c := range t.Clusters {
for _, n := range c.Nodes { for _, n := range c.Nodes {
needsTproxy := n.NeedsTransparentProxy()
joint := n.Images.EnvoyConsulImage() joint := n.Images.EnvoyConsulImage()
if _, ok := built[joint]; joint != "" && !ok { if _, ok := built[joint]; joint != "" && !ok {
logger.Info("building envoy+consul image", "image", joint) logger.Info("building envoy+consul image", "image", joint)
@ -145,7 +147,7 @@ func DockerImages(
} }
cdpTproxy := n.Images.LocalDataplaneTProxyImage() cdpTproxy := n.Images.LocalDataplaneTProxyImage()
if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok { if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok && needsTproxy {
logger.Info("building image", "image", cdpTproxy) logger.Info("building image", "image", cdpTproxy)
logw := logger.Named("docker_dataplane_tproxy").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug}) logw := logger.Named("docker_dataplane_tproxy").StandardWriter(&hclog.StandardLoggerOptions{ForceLevel: hclog.Debug})
err := run.DockerExecWithStderr(context.TODO(), []string{ err := run.DockerExecWithStderr(context.TODO(), []string{

View File

@ -262,7 +262,9 @@ func (g *Generator) Generate(step Step) error {
addImage("", node.Images.Consul) addImage("", node.Images.Consul)
addImage("", node.Images.EnvoyConsulImage()) addImage("", node.Images.EnvoyConsulImage())
addImage("", node.Images.LocalDataplaneImage()) addImage("", node.Images.LocalDataplaneImage())
if node.NeedsTransparentProxy() {
addImage("", node.Images.LocalDataplaneTProxyImage()) addImage("", node.Images.LocalDataplaneTProxyImage())
}
if node.IsAgent() { if node.IsAgent() {
addVolume(node.DockerName()) addVolume(node.DockerName())

View File

@ -29,10 +29,7 @@ func (i Images) LocalDataplaneImage() string {
tag = "latest" tag = "latest"
} }
repo, name, ok := strings.Cut(img, "/") name := strings.ReplaceAll(img, "/", "-")
if ok {
name = repo + "-" + name
}
// ex: local/hashicorp-consul-dataplane:1.1.0 // ex: local/hashicorp-consul-dataplane:1.1.0
return "local/" + name + ":" + tag return "local/" + name + ":" + tag
@ -60,19 +57,8 @@ func spliceImageNamesAndTags(base1, base2, nameSuffix string) string {
tag2 = "latest" tag2 = "latest"
} }
repo1, name1, ok1 := strings.Cut(img1, "/") name1 := strings.ReplaceAll(img1, "/", "-")
repo2, name2, ok2 := strings.Cut(img2, "/") name2 := strings.ReplaceAll(img2, "/", "-")
if ok1 {
name1 = repo1 + "-" + name1
} else {
name1 = repo1
}
if ok2 {
name2 = repo2 + "-" + name2
} else {
name2 = repo2
}
if nameSuffix != "" { if nameSuffix != "" {
nameSuffix = "-" + nameSuffix nameSuffix = "-" + nameSuffix

View File

@ -679,6 +679,15 @@ func (n *Node) SortedServices() []*Service {
return out return out
} }
func (n *Node) NeedsTransparentProxy() bool {
for _, svc := range n.Services {
if svc.EnableTransparentProxy {
return true
}
}
return false
}
// DigestExposedPorts returns true if it was changed. // DigestExposedPorts returns true if it was changed.
func (n *Node) DigestExposedPorts(ports map[int]int) bool { func (n *Node) DigestExposedPorts(ports map[int]int) bool {
if reflect.DeepEqual(n.usedPorts, ports) { if reflect.DeepEqual(n.usedPorts, ports) {